nginx日志导入elasticsearch的方法示例

将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建。

1、配置nginx日志格式

  log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '               '$status $body_bytes_sent $http_referer '               '"$http_user_agent" '              '"$connection" '              '"$http_cookie" '              '$request_time '              '$upstream_response_time';  

2、安装配置filebeat,启用nginx module

  tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/local  cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat  cd /usr/local/filebeat  

启用nginx模块

  ./filebeat modules enable nginx    

查看模块

  ./filebeat modules list    

创建配置文件

  vim /usr/local/filebeat/blog_module_logstash.yml  filebeat.modules:  - module: nginx   access:    enabled: true    var.paths: ["/home/weblog/blog.cnfol.com_access.log"]   #error:   # enabled: true   # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]      output.logstash:   hosts: ["192.168.15.91:5044"]    

启动filebeat

  ./filebeat -c blog_module_logstash.yml -e    

3、配置logstash

  tar -zxvf logstash-6.2.4.tar.gz /usr/local  cd /usr/local;ln -s logstash-6.2.4 logstash  创建一个nginx日志的pipline文件  cd /usr/local/logstash  

logstash内置的模板目录

  vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

编辑 grok-patterns 添加一个支持多ip的正则

  FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

官方grok

#

创建logstash pipline配置文件

  #input {  # stdin {}  #}  # 从filebeat接受数据  input {   beats {   port => 5044   host => "0.0.0.0"   }  }    filter {   # 添加一个调试的开关   mutate{add_field => {"[@metadata][debug]"=>true}}   grok {   # 过滤nginx日志   #match => { "message" => "%{NGINXACCESS_TEST2}" }   #match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^#]*) # [%{HTTPDATE:[@metadata][webtime]}] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }   #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) [%{HTTPDATE:[@metadata][webtime]}] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }      match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} [%{HTTPDATE:[@metadata][webtime]}] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }   }   # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp   ruby {    #code => "event.set('@read_timestamp',event.get('@timestamp'))"   #将时区改为东8区   code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"   }   # 将nginx的日志记录时间格式化   # 格式化时间 20/May/2015:21:05:56 +0000   date {   locale => "en"   match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"]   }   # 将bytes字段由字符串转换为数字   mutate {   convert => {"bytes" => "integer"}   }   # 将cookie字段解析成一个json   #mutate {   # gsub => ["cookies",';',',']   #}    # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip   if[http_x_forwarded_for] =~ ", "{       ruby {           code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'          }      }   # 解析ip,获得ip的地理位置   geoip {   source => "http_x_forwarded_for"   # # 只获取ip的经纬度、国家、城市、时区   fields => ["location","country_name","city_name","region_name"]    }   # 将agent字段解析,获得浏览器、系统版本等具体信息   useragent {   source => "agent"   target => "useragent"   }   #指定要删除的数据   #mutate{remove_field=>["message"]}   # 根据日志名设置索引名的前缀   ruby {   code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'   }    # 将@timestamp 格式化为2019.04.23   ruby {   code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'   }   # 设置输出的默认索引名   mutate {   add_field => {    #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"    "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"   }   }   # 将cookies字段解析成json  # mutate {  # gsub => [  #  "cookies", ";", ",",  #  "cookies", "=", ":"  # ]  # #split => {"cookies" => ","}  # }  # json_encode {  # source => "cookies"  # target => "cookies_json"  # }  # mutate {  # gsub => [  #  "cookies_json", ',', '","',  #  "cookies_json", ':', '":"'  # ]  # }  # json {  # source => "cookies_json"  # target => "cookies2"  # }   # 如果grok解析存在错误,将错误独立写入一个索引   if "_grokparsefailure" in [tags] {   #if "_dateparsefailure" in [tags] {   mutate {    replace => {    #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"    "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"    }   }   # 如果不存在错误就删除message   }else{   mutate{remove_field=>["message"]}   }  }    output {   if [@metadata][debug]{   # 输出到rubydebuyg并输出metadata   stdout{codec => rubydebug{metadata => true}}   }else{   # 将输出内容转换成 "."   stdout{codec => dots}    # 将输出到指定的es   elasticsearch {    hosts => ["192.168.15.160:9200"]    index => "%{[@metadata][index]}"    document_type => "doc"   }    }  }    

启动logstash

  nohup bin/logstash -f test_pipline2.conf &

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支Fatmouse

参与评论