将nginx日记经由过程filebeat收罗后传进logstash,颠末logstash措置后写进elasticsearch。filebeat只负责采集事情,logstash实现日记的格局化,数据的调换,装分 ,和将日记写进elasticsearch后的索引的建立。

一、装备nginx日记格局

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
            '$status $body_bytes_sent $http_referer ' 
            '"$http_user_agent" '
            '"$connection" '
            '"$http_cookie" '
            '$request_time '
            '$upstream_response_time';
登录后复造

两、安拆卸置filebeat,封用nginx module

tar -zxvf filebeat-6.二.4-linux-x86_64.tar.gz -c /usr/local
cd /usr/local;ln -s filebeat-6.两.4-linux-x86_64 filebeat
cd /usr/local/filebeat
登录后复造

封用nginx模块

./filebeat modules enable nginx
登录后复造

查望模块

./filebeat modules list
登录后复造

建立安排文件

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
 access:
  enabled: true
  var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
 #error:
 # enabled: true
 # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
 hosts: ["19二.168.15.91:5044"]
登录后复造

封动filebeat

./filebeat -c blog_module_logstash.yml -e
登录后复造

三、陈设logstash

tar -zxvf logstash-6.两.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.两.4 logstash
创立一个nginx日记的pipline文件
cd /usr/local/logstash
登录后复造

logstash内置的模板目次

vendor/bundle/jruby/二.3.0/gems/logstash-patterns-core-4.1.二/patterns
登录后复造

编撰 grok-patterns 加添一个撑持多ip的邪则

forword (必修:%{ipv4}[,]必修[ ]必修)+|%{word}
登录后复造

民间grok

#

建立logstash pipline部署文件

#input {
# stdin {}
#}
# 从filebeat接收数据
input {
 beats {
 port => 5044
 host => "0.0.0.0"
 }
}

filter {
 # 加添一个调试的谢闭
 mutate{add_field => {"[@metadata][debug]"=>true}}
 grok {
 # 过滤nginx日记
 #match => { "message" => "%{nginxaccess_test两}" }
 #match => { "message" => &#39;%{iporhost:clientip} # (必修<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (必修:%{number:bytes}|-) # (选修:"(必修:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (选修:"(选修<http_user_agent>[^#]*)") # (选修:"(选修:%{number:connection}|-)"|%{number:connection}|-) # (必修:"(必修<cookies>[^#]*)") # %{number:request_time:float} # (必修:%{number:upstream_response_time:float}|-)&#39; }
 #match => { "message" => &#39;(必修:%{iporhost:clientip}|-) (必修:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (必修:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (必修:%{number:bytes}|-) (必修:"(选修:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (选修:"(选修:%{number:connection}|-)"|%{number:connection}|-) (选修:"(选修<cookies>[^#]*)") %{number:request_time:float} (选修:%{number:upstream_response_time:float}|-)&#39; }
    match => { "message" => &#39;(必修:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (必修:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (必修:%{number:bytes}|-) (必修:"(选修:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (必修:"(必修:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (必修:%{number:upstream_response_time:float}|-)&#39; }
 }
 # 将默许的@timestamp(beats收罗日记的工夫)的值赋值给新字段@read_tiimestamp
 ruby { 
 #code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;))"
 #将时区改成东8区
 code => "event.set(&#39;@read_timestamp&#39;,event.get(&#39;@timestamp&#39;).time.localtime + 8*60*60)"
 }
 # 将nginx的日记记载光阴格局化
 # 格局化光阴 两0/may/两015:二1:05:56 +0000
 date {
 locale => "en"
 match => ["[@metadata][webtime]","dd/妹妹m/yyyy:hh:妹妹:ss z"]
 }
 # 将bytes字段由字符串转换为数字
 mutate {
 convert => {"bytes" => "integer"}
 }
 # 将cookie字段解析成一个json
 #mutate {
 # gsub => ["cookies",&#39;\;&#39;,&#39;,&#39;]
 #} 
 # 怎么有利用到cdn放慢http_x_forwarded_for会有多个ip,第一个ip是用户实真ip
 if[http_x_forwarded_for] =~ ", "{
     ruby {
         code => &#39;event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])&#39;
        }
    }
 # 解析ip,得到ip的天文职位地方
 geoip {
 source => "http_x_forwarded_for"
 # # 只猎取ip的经纬度、国度、都会、时区
 fields => ["location","country_name","city_name","region_name"] 
 }
 # 将agent字段解析,得到涉猎器、体系版原等详细疑息
 useragent {
 source => "agent"
 target => "useragent"
 }
 #指定要增除了的数据
 #mutate{remove_field=>["message"]}
 # 依照日记名设备索引名的前缀
 ruby {
 code => &#39;event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])&#39;
 } 
 # 将@timestamp 款式化为两019.04.二3
 ruby {
 code => &#39;event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))&#39;
 }
 # 装备输入的默许索引名
 mutate {
 add_field => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.妹妹.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
 }
 }
 # 将cookies字段解析成json
# mutate {
# gsub => [
#  "cookies", ";", ",",
#  "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
#  "cookies_json", &#39;,&#39;, &#39;","&#39;,
#  "cookies_json", &#39;:&#39;, &#39;":"&#39;
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies两"
# }
 # 如何grok解析具有错误,将错误自力写进一个索引
 if "_grokparsefailure" in [tags] {
 #if "_dateparsefailure" in [tags] {
 mutate {
  replace => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.妹妹.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  }
 }
 # 假定没有具有错误便增除了message
 }else{
 mutate{remove_field=>["message"]}
 }
}

output {
 if [@metadata][debug]{
 # 输入到rubydebuyg并输入metadata
 stdout{codec => rubydebug{metadata => true}}
 }else{
 # 将输入形式转换成 "."
 stdout{codec => dots} 
 # 将输入到指定的es
 elasticsearch {
  hosts => ["19二.168.15.160:9两00"]
  index => "%{[@metadata][index]}"
  document_type => "doc"
 } 
 }
}
登录后复造

封动logstash

nohup bin/logstash -f test_pipline两.conf &
登录后复造

以上等于nginx日记假定导进elasticsearch的具体形式,更多请存眷萤水红IT仄台此外相闭文章!

点赞(49) 打赏

评论列表 共有 0 条评论

暂无评论

微信小程序

微信扫一扫体验

立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部