从kafka获取数据,存到mongodb中。适合空间查询geo_point设置。配置文件如下:
input {
kafka { type => "test" auto_offset_reset => "smallest" group_id => "m2" topic_id => "db100" zk_connect => "192.168.199.6:2181,192.168.199.7:2181,192.168.199.8:2181" }}filter { mutate { split => { "message" => "," } add_field => { "id" => "%{message[1]}" "SYZTDM_s" => "%{message[55]}" "lat" => "%{message[6]}" "lon" => "%{message[7]}" "loc" => "%{message[6]}" } remove_field => [ "message" ] remove_field => [ "path" ] remove_field => [ "host" ] remove_field => [ "type" ]}mutate{ convert => {"lat" => "float"} convert => { "lon" => "float"} convert => {"loc" => "float"}}mutate{ merge => {"loc" =>"lon"}}}output { mongodb { collection => "base" database => "fragment" uri => "mongodb://192.168.199.7:27017" }}
注意:
1.logstash需要安装mongodb插件,默认没有安装的。(bin/logstash-plugin install logstash-output-mongodb)
2.插入方式是insert方式,是单个插入。
3.geo_point查询方式是数组类型的。