Find the Underlying Structure of Big Data by Machine Learning with Spark | QQ群: 452154809

Flume1.6整合Kafka注意事项

Flume Bin 1013℃ 0评论

Flume升级到1.6之后本身就支持和Kafka的连接,见1.6特性:

     1.Flume Sink and Source for Apache Kafka

     2. A new channel that uses Kafka

但是会有这样的需求:拼接不同数据源的数据,此时就需要我们特别处理:

    1 Flume 发送消息时指定数据源join的字段为key【设置发送event的head中的key=”为Join的字段”】,发送的message不变 .具体见KafkaSink源码:

for (; processedEvents < batchSize; processedEvents += 1) {


event = channel.take();
if (event == null) {
break;
// no events available in channel
}
byte[] eventBody = event.getBody();
Map headers = event.getHeaders();
if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}
eventKey = headers.get(KEY_HDR); // KEY_HDR="key"
if (logger.isDebugEnabled()) {
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
+ new String(eventBody, "UTF-8"));
logger.debug("event #{}", processedEvents);
}
// create a message and add to buffer
KeyedMessage data = new KeyedMessage(eventTopic, eventKey, eventBody);
messageList.add(data); // messageList为批量发送的消息
}

    2 指定producer.sinks.r.partitioner.class = kafka.producer.DefaultPartitioner,指定发送消息的key后,按Key的Hash值进行策略分发,保证需要Join的来源不同的数发送到Kafka的同一个Partition下。据具体见DefaultPartitioner关键代码:


private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
private val random = new java.util.Random
def partition(key: T, numPartitions: Int): Int = {
if(key == null)
{
println("key is null")
random.nextInt(numPartitions)
}
else
{
println("key is "+ key + " hashcode is "+key.hashCode)
math.abs(key.hashCode) % numPartitions
}
}
}

    另外注意,若如我们需完善Kafka-sink,自定义的Kafka-sink的包名需在org.apache.flume.plugins下,主要是为了支持flume内置计数器。flume的http监控代码里有这样一句话

if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
continue;
}

    如果你的组件package的name不是以org.apache.flume开头的,它就跳过你这个组件,不去获取相关的监控信息,所以在监控页面上你找不到的组件信息,你可能认为你的组件出问题了。根本原因就在这里,当然你也可以改源码,删掉这句话就行了!!

参考:

1 flume的自定义组件如何才能被flume的httpmetricsServer监控起来

2 org.apache.flume.sink.kafka.KafkaSink源码

3 kafka producer 中partition 使用方式

转载请注明:单向街的夏天 » Flume1.6整合Kafka注意事项

喜欢 (5)or分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址