博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume使用小结
阅读量:6423 次
发布时间:2019-06-23

本文共 5042 字,大约阅读时间需要 16 分钟。

    本文介绍初次使用Flume传输数据到MongoDB的过程,内容涉及环境部署和注意事项。

1 环境搭建

    需要jdk、flume-ng、mongodb java driver、flume-ng-mongodb-sink

(1)jdk下载地址:
(2)flune-ng下载地址:
(3)mongodb java driver jar包下载地址:
(4)flume-ng-mongodb-sink 源码下载地址:
flume-ng-mongodb-sink 需要自己编译jar包,从github上下载代码,解压之后执行mvn package,即可生成。需要先安装maven用于编译jar包,且机器需要能联网。

2 简单原理介绍

    这是一个关于池子的故事。有一个池子,它一头进水,另一头出水,进水口可以配置各种管子,出水口也可以配置各种管子,可以有多个进水口、多个出水口。水术语称为Event,进水口术语称为Source、出水口术语成为Sink、池子术语成为Channel,Source+Channel+Sink,术语称为Agent。如果有需要,还可以把多个Agent连起来。

更多细节参考官方文档:

3 Flume配置

(1)  env配置

      将mongo-java-driver和flume-ng-mongodb-sink两个jar包放到flume\lib目录下,并将路径加入到flume-env.sh文件的FLUME_CLASSPATH变量中;

  JAVA_OPTS变量: 加上-Dflume.monitoring.type=http -Dflume.monitoring.port=xxxx,可以在[hostname:xxxx]/metrics 上看到监控信息;  -Xms指定JVM初始内存,-Xmx指定JVM最大内存
  FLUME_HOME变量: 设定FLUME根目录
  JAVA_HOME变量:  设定JAVA根目录

(2) log配置

      在调试时,将日志设置为debug并打到文件:flume.root.logger=DEBUG,LOGFILE

(3) 传输配置
        采用 Exec Source、file-channel、flume-ng-mongodb-sink。
    Source配置举例:

my_agent.sources.my_source_1.channels = my_channel_1my_agent.sources.my_source_1.type = execmy_agent.sources.my_source_1.command = python  xxx.pymy_agent.sources.my_source_1.shell = /bin/bash -cmy_agent.sources.my_source_1.restartThrottle = 10000my_agent.sources.my_source_1.restart = truemy_agent.sources.my_source_1.logStdErr = truemy_agent.sources.my_source_1.batchSize = 1000my_agent.sources.my_source_1.interceptors = i1 i2 i3my_agent.sources.my_source_1.interceptors.i1.type = staticmy_agent.sources.my_source_1.interceptors.i1.key = dbmy_agent.sources.my_source_1.interceptors.i1.value = cswuyg_testmy_agent.sources.my_source_1.interceptors.i2.type = staticmy_agent.sources.my_source_1.interceptors.i2.key = collectionmy_agent.sources.my_source_1.interceptors.i2.value = cswuyg_testmy_agent.sources.my_source_1.interceptors.i3.type = staticmy_agent.sources.my_source_1.interceptors.i3.key = opmy_agent.sources.my_source_1.interceptors.i3.value = upsert

    字段说明:

    采用exec source,指定执行命令行为python  xxx.py,我在xxx.py代码中处理日志,并按照跟flume-ng-mongodb-sink的约定,print出json格式的数据,如果update类操作必须带着_id字段,print出来的日志被当作Event的Body,我再通过interceptors给它加上自定义Event Header;
static interceptors用于为Event Header添加信息,这里我为它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,这三个key是跟flume-ng-mongodb-sink 约定的,用于指定mongodb中的db、collection名以及操作类型为update。
    Channel配置举例:

my_agent.channels.my_channel_1.type = filemy_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPointmy_agent.channels.my_channel_1.useDualCheckpoints = truemy_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/datamy_agent.channels.my_channel_1.transactionCapacity = 10000my_agent.channels.my_channel_1.checkpointInterval = 30000my_agent.channels.my_channel_1.maxFileSize = 4292870142my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000my_agent.channels.my_channel_1.capacity = 100000

    字段说明:

    要注意的参数是capacity,它指定了池子里可以存放的Event数量,需要根据日志量设置一个合适的值,如果你也采用file-channel,而且磁盘充足,那可以尽可能的设置得大些。

    dataDirs指定池子存放的位置,如果可以,选择IO不是那么高的磁盘,可以使用逗号分隔使用多个磁盘目录。

    sink配置举例:

my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSinkmy_agent.sinks.my_mongo_1.host = xxxhostmy_agent.sinks.my_mongo_1.port = yyyportmy_agent.sinks.my_mongo_1.model = dynamicmy_agent.sinks.my_mongo_1.batch = 10my_agent.sinks.my_mongo_1.channel = my_channel_1my_agent.sinks.my_mongo_1.timestampField = _S

    字段说明:

 model选择dynamic,表示mongodb的db、collection名字采用Event Header中指定的名字。timestampField 字段用于将json串中指定键的值转换为datetime格式存进mongodb,flume-ng-mongodb-sink不支持嵌套key指定(如:_S.y),但可以自己通过修改sink的代码来实现。

    agent配置举例:

my_agent.channels = my_channel_1my_agent.sources = my_source_1my_agent.sinks = my_mongo_1

(4) 启动

    可以写一个control.sh 脚本来控制flume的启动、关闭、重启。

    启动demo:
./bin/flume-ng agent --conf ./conf/ --conf-file ./conf/flume.conf -n agent1 > ./start.log 2>&1 &

    从此以后,日志数据就从日志文件,通过xxx.py读取,进入到flie-channel,再被flume-ng-mongodb-sink读走,进入到目的地MongoDB Cluster。
搭好基本功能之后,以后需要做的就是调整xxx.py、增强flume-ng-mongodb-sink。

4 其它

 1、监控:官方推荐的监控是ganglia:,有图像界面。

 2、版本变更:flume 从1.X开始已经不再使用ZooKeeper,在数据可靠性上,提供了E2E(end-to-end)的支持,去掉了重构之前的DFO(store on failure)、BE(best effort)。E2E指的是:在删除channel中的event时,保证event已经传递到了下一个agent或者终点,不过,这里没有提到数据在进入到channel之前如何保证不丢失,像Exec Source这种数据导入channel的方式,需要使用者自己保证。

 3、关闭插件:使用Exec Source时,flume重启不会关闭掉旧插件进程,需要自己关闭。

 4、Exec Source不能保证数据不丢失,因为这种方式只是把水灌到池子里,不管池子是什么状况, 参见 的 Warning 部分。但是,Spooling directory source 也不一定是个好方法,监控目录,但是注意不能修改文件的名字,不能出现同名覆盖文件,不要出现只有一半内容的文件。传输完成之后,文件会被重命名为xx.COMPLETED,需要有定时清理脚本把这些文件清理掉。重启会导致出现重复event,因为那些被传输到一半的文件没有被设置为完成状态。

 5、传输瓶颈:使用flume+mongodb来安全传输大量数据(每秒万条级别的日志不算大数据量,每天几百G的也不算),瓶颈会出现在MongoDB上,特别是Update类型的数据传输。

 6、需要修改当前的flume-ng-mongodb-sink 插件:(1)让update支持 $setOnInsert;(2)解决update的 $set、$inc为空时,引发exception的bug;(3)解决批量插入时,因其中一条日志有duplicate exception而导致同批插入的后续日志全部被丢弃的bug。

 7、flume跟fluentd很类似,但来自hadoop生态的flume更热门,所以我选择flume。

 8、批量部署:先把jdk、flume打包成tar,然后借助python 的 paramiko库,将tar包发到各台机器上,解压、运行。

 

本文所在: 

参考:

1、http://flume.apache.org/FlumeDeveloperGuide.html

2、《Apache Flume: Distributed Log Collection for Hadoop》 

 

转载地址:http://zprra.baihongyu.com/

你可能感兴趣的文章
房地产英语 Real estate词汇
查看>>
python接口自动化测试(八)-unittest-生成测试报告
查看>>
第 26 章 MySQL
查看>>
How far away ?(DFS)
查看>>
C#中三种截屏方式总结
查看>>
EF架构~LinqToEntity里实现left join的一对一与一对多
查看>>
Spring.net 学习笔记之ASP.NET底层架构
查看>>
C# System.Windows.Forms.WebBrowser中判断浏览器内核和版本
查看>>
Java 动态太极图 DynamicTaiChi (整理)
查看>>
Web APi之Web Host消息处理管道(六)
查看>>
微信公众平台后台编辑器上线图片缩放和封面图裁剪功能
查看>>
git使用教程2-更新github上代码
查看>>
张掖百公里,再次折戟
查看>>
SAP QM Batch to Batch的转移过账事务中的Vendor Batch
查看>>
本期最新 9 篇论文,帮你完美解决「读什么」的问题 | PaperDaily #19
查看>>
图解SSIS监视文件夹并自动导入数据
查看>>
Lucene.Net 2.3.1开发介绍 —— 四、搜索(一)
查看>>
人工智能将如何变革视频监控行业?
查看>>
MyBatis Review——开发Dao的方法
查看>>
只在UnitTest和WebHost中的出现的关于LogicalCallContext的严重问题
查看>>