LL created FLUME-3236:
-------------------------

             Summary: use kafka channel but the content in kafka is abnormal
                 Key: FLUME-3236
                 URL: https://issues.apache.org/jira/browse/FLUME-3236
             Project: Flume
          Issue Type: Bug
            Reporter: LL


I use mac base environment, use kafka 2.11-1.1.0 and flume 1.8

I write a client by flume sdk on another tablet like

MyRpcClientFacade client = new MyRpcClientFacade();
client.init();
List<Event> buffer = new ArrayList<Event>();
String sampleDataitem = "\{\"cmd\":\"add\", \"fields\": {\"itemid\": 
\"28394556\", \"cateid\": \"9_2_1\", \"score\": 459, \"title\": \"天穿修炼记最新版\", 
\"item_tags\": \"修仙\"}}";
String sampleDatauser = "\{\"cmd\":\"add\", 
\"fields\":{\"timestamp\":1489040079, \"action_type\":\"view\", \"userid\": 
\"1234\", \"itemid\": \"abc12\"}}";
Event event;
for (int i = 1; i < 200000; i++) {
 event = EventBuilder.withBody(sampleDatauser, Charset.forName("UTF-8"));
 buffer.add(event);
 if (i % 100 ==0) {
 client.sendDataToFlume(buffer);
 buffer.clear();
 }
}

 

and my flume conf is 

1 balance.sources=syslog_tail
 2 balance.sinks=hdfsSink
 3 balance.channels=kafkaChannel
 4
 5 balance.sources.syslog_tail.type=avro
 6 balance.sources.syslog_tail.threads=20
 7 balance.sources.syslog_tail.bind=192.168.0.104
 8 balance.sources.syslog_tail.port=44444
 9 balance.sources.syslog_tail.channels=kafkaChannel
 10
 11 #balance.sources.syslog_tail.interceptors=i1
 12 #balance.sources.syslog_tail.interceptors.i1.type=timestamp
 13
 14 balance.channels.kafkaChannel.channel=kafkaChannel
 15 
balance.channels.kafkaChannel.type=org.apache.flume.channel.kafka.KafkaChannel
 16 balance.channels.kafkaChannel.kafka.topic=test
 17 balance.channels.kafkaChannel.kafka.consumer.group.id=flume-consumer
 18 balance.channels.kafkaChannel.kafka.bootstrap.servers=localhost:9092
 19 #balance.channels.kafkaChannel.kafka.defaultPartitionId=0
 20
 21 balance.sinks.hdfsSink.channel=kafkaChannel
 22 #balance.sinks.hdfsSink.type=logger
 23 balance.sinks.hdfsSink.type = hdfs
 24 balance.sinks.hdfsSink.hdfs.path=hdfs://localhost:9000/kafkaData/%Y-%m/%d
 25 balance.sinks.hdfsSink.hdfs.fileType=DataStream
 26 balance.sinks.hdfsSink.hdfs.writeFormat=Text
 27 balance.sinks.hdfsSink.hdfs.rollSize=128000000
 28 balance.sinks.hdfsSink.hdfs.rollCount=0
 29 balance.sinks.hdfsSink.hdfs.round=true
 30 balance.sinks.hdfsSInk.hdfs.rollInterval=60
 31 balance.sinks.hdfsSink.hdfs.roundUnit=day
 32 balance.sinks.hdfsSink.hdfs.roundValue=1
 33 balance.sinks.hdfsSink.hdfs.threadsPoolSize=25
 34 balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
 35 balance.sinks.hdfsSink.hdfs.minBlockReplicas=1
 36 balance.sinks.hdfsSink.hdfs.idleTimeout=30
 37 #balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
 38 balance.sinks.hdfsSink.hdfs.filePrefix=%Y-%m-%d

 

but when I check the content in my kafka is like this

_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

 

_and I write some program get the content from kafka which is also like this_

_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

 

_but when I check my hdfs the content is like this_ 

_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
"userid": "1234", "itemid": "abc12"}}_

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to