[ 
https://issues.apache.org/jira/browse/FLUME-3236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17446496#comment-17446496
 ] 

Haibo Wu commented on FLUME-3236:
---------------------------------

You should add the below configuration for your kafka channel.

==>parseAsFlumeEvent = false

This configuration can avoid using the serialized Avro Flume events from 
source. And the kafka channel will parse it as txt format. 

> use kafka channel but the content in kafka is abnormal
> ------------------------------------------------------
>
>                 Key: FLUME-3236
>                 URL: https://issues.apache.org/jira/browse/FLUME-3236
>             Project: Flume
>          Issue Type: Bug
>            Reporter: LL
>            Priority: Major
>
> I use mac base environment, use kafka 2.11-1.1.0 and flume 1.8
> I write a client by flume sdk on another tablet like
> MyRpcClientFacade client = new MyRpcClientFacade();
> client.init();
> List<Event> buffer = new ArrayList<Event>();
> String sampleDataitem = "\{\"cmd\":\"add\", \"fields\": {\"itemid\": 
> \"28394556\", \"cateid\": \"9_2_1\", \"score\": 459, \"title\": \"天穿修炼记最新版\", 
> \"item_tags\": \"修仙\"}}";
> String sampleDatauser = "\{\"cmd\":\"add\", 
> \"fields\":{\"timestamp\":1489040079, \"action_type\":\"view\", \"userid\": 
> \"1234\", \"itemid\": \"abc12\"}}";
> Event event;
> for (int i = 1; i < 200000; i++) {
>  event = EventBuilder.withBody(sampleDatauser, Charset.forName("UTF-8"));
>  buffer.add(event);
>  if (i % 100 ==0) {
>  client.sendDataToFlume(buffer);
>  buffer.clear();
>  }
> }
>  
> and my flume conf is 
> 1 balance.sources=syslog_tail
>  2 balance.sinks=hdfsSink
>  3 balance.channels=kafkaChannel
>  4
>  5 balance.sources.syslog_tail.type=avro
>  6 balance.sources.syslog_tail.threads=20
>  7 balance.sources.syslog_tail.bind=192.168.0.104
>  8 balance.sources.syslog_tail.port=44444
>  9 balance.sources.syslog_tail.channels=kafkaChannel
>  10
>  11 #balance.sources.syslog_tail.interceptors=i1
>  12 #balance.sources.syslog_tail.interceptors.i1.type=timestamp
>  13
>  14 balance.channels.kafkaChannel.channel=kafkaChannel
>  15 
> balance.channels.kafkaChannel.type=org.apache.flume.channel.kafka.KafkaChannel
>  16 balance.channels.kafkaChannel.kafka.topic=test
>  17 balance.channels.kafkaChannel.kafka.consumer.group.id=flume-consumer
>  18 balance.channels.kafkaChannel.kafka.bootstrap.servers=localhost:9092
>  19 #balance.channels.kafkaChannel.kafka.defaultPartitionId=0
>  20
>  21 balance.sinks.hdfsSink.channel=kafkaChannel
>  22 #balance.sinks.hdfsSink.type=logger
>  23 balance.sinks.hdfsSink.type = hdfs
>  24 balance.sinks.hdfsSink.hdfs.path=hdfs://localhost:9000/kafkaData/%Y-%m/%d
>  25 balance.sinks.hdfsSink.hdfs.fileType=DataStream
>  26 balance.sinks.hdfsSink.hdfs.writeFormat=Text
>  27 balance.sinks.hdfsSink.hdfs.rollSize=128000000
>  28 balance.sinks.hdfsSink.hdfs.rollCount=0
>  29 balance.sinks.hdfsSink.hdfs.round=true
>  30 balance.sinks.hdfsSInk.hdfs.rollInterval=60
>  31 balance.sinks.hdfsSink.hdfs.roundUnit=day
>  32 balance.sinks.hdfsSink.hdfs.roundValue=1
>  33 balance.sinks.hdfsSink.hdfs.threadsPoolSize=25
>  34 balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
>  35 balance.sinks.hdfsSink.hdfs.minBlockReplicas=1
>  36 balance.sinks.hdfsSink.hdfs.idleTimeout=30
>  37 #balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
>  38 balance.sinks.hdfsSink.hdfs.filePrefix=%Y-%m-%d
>  
> but when I check the content in my kafka is like this
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
>  
> _and I write some program get the content from kafka which is also like this_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
>  
> _but when I check my hdfs the content is like this_ 
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view", 
> "userid": "1234", "itemid": "abc12"}}_
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to