********************* 部署过程 **************************
一:场景描述
对于线上大流量服务或者需要上报日志的nginx服务,每天会产生大量的日志,这些日志非常有价值。可用于计数上报、用户行为分析、接口质量、性能监控等需求。但传统nginx记录日志的方式数据会散落在各自nginx上,而且大流量日志本身对磁盘也是一种冲击。
我们需要把这部分nginx日志统一收集汇总起来,收集过程和结果需要满足如下需求: 支持不同业务获取数据,如监控业务,数据分析统计业务,推荐业务等。 数据实时性 高性能保证二:技术方案
得益于openresty和kafka的高性能,我们可以非常轻量高效的实现当前需求,架构如下:
方案描述: 1:线上请求打向nginx后,使用lua完成日志整理:如统一日志格式,过滤无效请求,分组等。 2:根据不同业务的nginx日志,划分不同的topic。 3:lua实现producter异步发送到kafka集群。 4:对不同日志感兴趣的业务组实时消费获取日志数据。三:相关技术
openresty: kafka: lua-resty-kafka:四:安装配置
为了简单直接,我们采用单机形式配置部署,集群情况类似。 1)准备openresty依赖: Java代码 收藏代码 apt-get install libreadline-dev libncurses5-dev libpcre3-dev libssl-dev perl make build-essential # 或者 yum install readline-devel pcre-devel openssl-devel gcc2)安装编译openresty:
Java代码 收藏代码 #1:安装openresty: cd /opt/nginx/ # 安装文件所在目录 wget tar -xzf openresty-1.9.7.4.tar.gz /opt/nginx/#配置:
# 指定目录为/opt/openresty,默认在/usr/local。 ./configure –prefix=/opt/openresty \ –with-luajit \ –without-http_redis2_module \ –with-http_iconv_module make make install3)安装lua-resty-kafka
Java代码 收藏代码
#下载lua-resty-kafka: wget unzip lua-resty-kafka-master.zip -d /opt/nginx/#拷贝lua-resty-kafka到openresty
mkdir /opt/openresty/lualib/kafka cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /opt/openresty/lualib/kafka/4):安装单机kafka
Java代码 收藏代码 cd /opt/nginx/ wget tar xvf kafka_2.10-0.9.0.1.tgz# 开启单机zookeeper
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > ./zk.log 2>&1 & **# 绑定broker ip,必须绑定 **#在config/servier.properties下修改host.name host.name={your_server_ip} # 启动kafka服务 nohup sh bin/kafka-server-start.sh config/server.properties > ./server.log 2>&1 & # 创建测试topic sh bin/kafka-topics.sh –zookeeper localhost:2181 –create –topic test1 –partitions 1 –replication-factor 1五:配置运行
开发编辑/opt/openresty/nginx/conf/nginx.conf 实现kafka记录nginx日志功能,源码如下:
Java代码 收藏代码 worker_processes 12;events {
use epoll; worker_connections 65535; }http {
include mime.types; default_type application/octet-stream; sendfile on; keepalive_timeout 0; gzip on; gzip_min_length 1k; gzip_buffers 4 8k; gzip_http_version 1.1; gzip_types text/plain application/x-javascript text/css application/xml application/X-JSON; charset UTF-8; # 配置后端代理服务 upstream rc{ server 10.10.*.15:8080 weight=5 max_fails=3; server 10.10.*.16:8080 weight=5 max_fails=3; server 10.16.*.54:8080 weight=5 max_fails=3; server 10.16.*.55:8080 weight=5 max_fails=3; server 10.10.*.113:8080 weight=5 max_fails=3; server 10.10.*.137:8080 weight=6 max_fails=3; server 10.10.*.138:8080 weight=6 max_fails=3; server 10.10.*.33:8080 weight=4 max_fails=3; # 最大长连数 keepalive 32; } # 配置lua依赖库地址 lua_package_path “/opt/openresty/lualib/kafka/?.lua;;”;server { listen 80; server_name localhost; location /favicon.ico { root html; index index.html index.htm; } location / { proxy_connect_timeout 8; proxy_send_timeout 8; proxy_read_timeout 8; proxy_buffer_size 4k; proxy_buffers 512 8k; proxy_busy_buffers_size 8k; proxy_temp_file_write_size 64k; proxy_next_upstream http_500 http_502 http_503 http_504 error timeout invalid_header; root html; index index.html index.htm; proxy_pass http://rc; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制 log_by_lua ' -- 引入lua所有api local cjson = require "cjson" local producer = require "resty.kafka.producer" -- 定义kafka broker地址,ip需要和kafka的host.name配置一致 local broker_list = { { host = "10.10.78.52", port = 9092 }, } -- 定义json便于日志数据整理收集 local log_json = {} log_json["uri"]=ngx.var.uri log_json["args"]=ngx.var.args log_json["host"]=ngx.var.host log_json["request_body"]=ngx.var.request_body log_json["remote_addr"] = ngx.var.remote_addr log_json["remote_user"] = ngx.var.remote_user log_json["time_local"] = ngx.var.time_local log_json["status"] = ngx.var.status log_json["body_bytes_sent"] = ngx.var.body_bytes_sent log_json["http_referer"] = ngx.var.http_referer log_json["http_user_agent"] = ngx.var.http_user_agent log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for log_json["upstream_response_time"] = ngx.var.upstream_response_time log_json["request_time"] = ngx.var.request_time -- 转换json为字符串 local message = cjson.encode(log_json); -- 定义kafka异步生产者 local bp = producer:new(broker_list, { producer_type = "async" }) -- 发送日志消息,send第二个参数key,用于kafka路由控制: -- key为nill(空)时,一段时间向同一partition写入数据 -- 指定key,按照key的hash写入到对应的partition local ok, err = bp:send("test1", nil, message) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end '; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } }
}
六:检测&运行
Java代码 收藏代码
检测配置,只检测nginx配置是否正确,lua错误日志在nginx的error.log文件中 ./nginx -t /opt/openresty/nginx/conf/nginx.conf # 启动 ./nginx -c /opt/openresty/nginx/conf/nginx.conf # 重启 ./nginx -s reload七:测试
1:使用任意http请求发送给当前nginx,如:
引用2:查看upstream代理是否工作正常
3:查看kafka 日志对应的topic是否产生消息日志,如下: 引用# 从头消费topic数据命令
sh kafka-console-consumer.sh –zookeeper 10.10.78.52:2181 –topic test1 –from-beginning效果监测:
4:ab压力测试 引用#单nginx+upstream测试:
ab -n 10000 -c 100 -k#结果
Server Software: nginx Server Hostname: 10.10.34.15 Server Port: 80 Document Path: /m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com Document Length: 13810 bytes Concurrency Level: 100 Time taken for tests: 2.148996 seconds Complete requests: 10000 Failed requests: 9982 (Connect: 0, Length: 9982, Exceptions: 0) Write errors: 0 Keep-Alive requests: 0 Total transferred: 227090611 bytes HTML transferred: 225500642 bytes Requests per second: 4653.34 [#/sec] (mean) Time per request: 21.490 [ms] (mean) Time per request: 0.215 [ms] (mean, across all concurrent requests) Transfer rate: 103196.10 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.1 0 2 Processing: 5 20 23.6 16 701 Waiting: 4 17 20.8 13 686 Total: 5 20 23.6 16 701 Percentage of the requests served within a certain time (ms) 50% 16 66% 20 75% 22 80% 25 90% 33 95% 41 98% 48 99% 69 100% 701 (longest request)引用
#单nginx+upstream+log_lua_kafka接入测试:
ab -n 10000 -c 100 -k#结果
Server Software: openresty/1.9.7.4 Server Hostname: 10.10.78.52 Server Port: 80 Document Path: /m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com Document Length: 34396 bytes Concurrency Level: 100 Time taken for tests: 2.234785 seconds Complete requests: 10000 Failed requests: 9981 (Connect: 0, Length: 9981, Exceptions: 0) Write errors: 0 Keep-Alive requests: 0 Total transferred: 229781343 bytes HTML transferred: 228071374 bytes Requests per second: 4474.70 [#/sec] (mean) Time per request: 22.348 [ms] (mean) Time per request: 0.223 [ms] (mean, across all concurrent requests) Transfer rate: 100410.10 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.2 0 3 Processing: 6 20 27.6 17 1504 Waiting: 5 15 12.0 14 237 Total: 6 20 27.6 17 1504 Percentage of the requests served within a certain time (ms) 50% 17 66% 19 75% 21 80% 23 90% 28 95% 34 98% 46 99% 67 100% 1004 (longest request)
********************* 最重要的模块 **************************
nginx配置文件配置如下:
#user nobody;worker_processes 1;#error_log logs/error.log;#error_log logs/error.log notice;#error_log logs/error.log info;#pid logs/nginx.pid;events { worker_connections 1024;}http { include mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; upstream myServer { server 192.168.0.109:8080 weight=1; } lua_package_path "/opt/openresty/lualib/kafka/?.lua;;"; lua_need_request_body on; server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location /test1 { # 请求转向自定义的服务器列表 proxy_pass http://myServer; } location /test2 { # 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制 log_by_lua ' -- 引入lua所有api local topic = "test" local cjson = require "cjson" local producer = require "resty.kafka.producer" -- 定义kafka broker地址,ip需要和kafka的host.name配置一致 local broker_list = { { host = "192.168.0.109", port = 9092 }, { host = "192.168.0.110", port = 9092 }, { host = "192.168.0.101", port = 9092 } } -- 定义json便于日志数据整理收集 local log_json = {} log_json["uri"]=ngx.var.uri log_json["args"]=ngx.req.get_uri_args() log_json["host"]=ngx.var.host log_json["request_body"]=ngx.var.request_body log_json["remote_addr"] = ngx.var.remote_addr log_json["remote_user"] = ngx.var.remote_user log_json["time_local"] = ngx.var.time_local log_json["status"] = ngx.var.status log_json["body_bytes_sent"] = ngx.var.body_bytes_sent log_json["http_referer"] = ngx.var.http_referer log_json["http_user_agent"] = ngx.var.http_user_agent log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for log_json["upstream_response_time"] = ngx.var.upstream_response_time log_json["request_time"] = ngx.var.request_time -- 转换json为字符串 local message = cjson.encode(ngx.req.get_uri_args()); -- 定义kafka异步生产者 local bp = producer:new(broker_list, { producer_type = "async" }) -- 发送日志消息,send第二个参数key,用于kafka路由控制: -- key为nill(空)时,一段时间向同一partition写入数据 -- 指定key,按照key的hash写入到对应的partition local ok, err = bp:send(topic, nil, message) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end '; } #error_page 404 /404.html; # redirect server error pages to the static page /50x.html # error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } # proxy the PHP scripts to Apache listening on 127.0.0.1:80 # #location ~ \.php$ { # proxy_pass http://127.0.0.1; #} # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000 # #location ~ \.php$ { # root html; # fastcgi_pass 127.0.0.1:9000; # fastcgi_index index.php; # fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name; # include fastcgi_params; #} # deny access to .htaccess files, if Apache's document root # concurs with nginx's one # #location ~ /\.ht { # deny all; #} } # another virtual host using mix of IP-, name-, and port-based configuration # #server { # listen 8000; # listen somename:8080; # server_name somename alias another.alias; # location / { # root html; # index index.html index.htm; # } #} # HTTPS server # #server { # listen 443 ssl; # server_name localhost; # ssl_certificate cert.pem; # ssl_certificate_key cert.key; # ssl_session_cache shared:SSL:1m; # ssl_session_timeout 5m; # ssl_ciphers HIGH:!aNULL:!MD5; # ssl_prefer_server_ciphers on; # location / { # root html; # index index.html index.htm; # } #}}
********************* 遇到的坑 **************************
问题概述:
利用server1服务器上的openresty nginx的lua脚本往server5中kafka写数据,发现报错 无法解析主机(no resolver defined to resolve "xxxxx"),xxxxx是某台机器的域名,再后来,经过一天的摸索,发现了问题。
问题原因:
最终发现,原来是openResty不会去解析 host 映射,因为kafka客户端用IP连接后会请求broker,然后去到zookeeper拿到broker集群信息(地址记录是 kafka236:1111),这时候lua消费者拿到的是 kafka236 的IP,
但是又不会通过 host去解析,就会报错无法解析主机的问题。
解决方案
如果存在路由器DNS解析服务,直接在DNS配置个域名解析,再nginx配置里面指向这个DNS服务器即可(没有的话需要自己搭建DNS服务)nginx.conf配置:
DNS配置:
备注说明:
1、如果kafka服务端配置成IP或者域名,在kafka服务端的本机kafka客户端是无法用localhost连接的(除非服务端也用localhost)2、如果kafka服务端Listen配置成IP,那么在zookeeper记录的是IP地址
如果kafka服务端Listen配置成域名,那么在zookeeper记录的是域名
如果kafka服务端有advertised.listeners配置成域名,那么zookeeper会记录成域名,不管Listen配置成什么
后来发现
低版本的 openresty-1.7.10.2 , 在kafka中配置域名或者IP,都可以访问高版本的 openresty-1.13.6.2 , 在kafka中配置域名无法访问,只能是IP,配置resolver也不行。