Flume的一些组件(如Spooling Directory Source、File Channel)能够保证agent挂掉后不丢失数据。
1、负载均衡
1)Load balancing Sink Processor
source里的event流经channel,进入sink组,在sink组内部根据负载算法(round_robin、random)选择sink,后续可以选择不同机器上的agent实现负载均衡。
实例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels=c1 a1.sources.r1.command=tail -F /home/flume/xx.log #define sinkgroups a1.sinkgroups=g1 a1.sinkgroups.g1.sinks=k1 k2 a1.sinkgroups.g1.processor.type=load_balance a1.sinkgroups.g1.processor.backoff=true a1.sinkgroups.g1.processor.selector=round_robin #define the sink 1 a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.1.112 a1.sinks.k1.port=9876 #define the sink 2 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.1.113 a1.sinks.k2.port=9876 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel=c1 |
2)Load balancing Log4J Appender
不同的agent处理同一个client产生的数据。
1 2 3 | log4j.rootLogger=INFO,flume log4j.appender.flume = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.flume.Hosts = 192.168.1.111:41414 192.168.1.111:41414 |
2、故障转移
Failover Sink Processor
配置一组sink,这组sink组成一个Failover Sink Processor,当有一个sink处理失败,Flume将这个sink放到一个地方,等待冷却时间,可以正常处理event时再拿回来。
event通过通过一个channel流向一个sink组,在sink组内部根据优先级选择具体的sink,一个失败后再转向另一个sink,流程图如下:
实例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels=c1 a1.sources.r1.command=tail -F /home/flume/xx.log #define sinkgroups a1.sinkgroups=g1 a1.sinkgroups.g1.sinks=k1 k2 a1.sinkgroups.g1.processor.type=failover a1.sinkgroups.g1.processor.priority.k1=10 a1.sinkgroups.g1.processor.priority.k2=5 a1.sinkgroups.g1.processor.maxpenalty=10000 #define the sink 1 a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.1.112 a1.sinks.k1.port=9876 #define the sink 2 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.1.113 a1.sinks.k2.port=9876 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel=c1 |