[
https://issues.apache.org/jira/browse/FLUME-3236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17446496#comment-17446496
]
Haibo Wu commented on FLUME-3236:
---------------------------------
You should add the below configuration for your kafka channel.
==>parseAsFlumeEvent = false
This configuration can avoid using the serialized Avro Flume events from
source. And the kafka channel will parse it as txt format.
> use kafka channel but the content in kafka is abnormal
> ------------------------------------------------------
>
> Key: FLUME-3236
> URL: https://issues.apache.org/jira/browse/FLUME-3236
> Project: Flume
> Issue Type: Bug
> Reporter: LL
> Priority: Major
>
> I use mac base environment, use kafka 2.11-1.1.0 and flume 1.8
> I write a client by flume sdk on another tablet like
> MyRpcClientFacade client = new MyRpcClientFacade();
> client.init();
> List<Event> buffer = new ArrayList<Event>();
> String sampleDataitem = "\{\"cmd\":\"add\", \"fields\": {\"itemid\":
> \"28394556\", \"cateid\": \"9_2_1\", \"score\": 459, \"title\": \"天穿修炼记最新版\",
> \"item_tags\": \"修仙\"}}";
> String sampleDatauser = "\{\"cmd\":\"add\",
> \"fields\":{\"timestamp\":1489040079, \"action_type\":\"view\", \"userid\":
> \"1234\", \"itemid\": \"abc12\"}}";
> Event event;
> for (int i = 1; i < 200000; i++) {
> event = EventBuilder.withBody(sampleDatauser, Charset.forName("UTF-8"));
> buffer.add(event);
> if (i % 100 ==0) {
> client.sendDataToFlume(buffer);
> buffer.clear();
> }
> }
>
> and my flume conf is
> 1 balance.sources=syslog_tail
> 2 balance.sinks=hdfsSink
> 3 balance.channels=kafkaChannel
> 4
> 5 balance.sources.syslog_tail.type=avro
> 6 balance.sources.syslog_tail.threads=20
> 7 balance.sources.syslog_tail.bind=192.168.0.104
> 8 balance.sources.syslog_tail.port=44444
> 9 balance.sources.syslog_tail.channels=kafkaChannel
> 10
> 11 #balance.sources.syslog_tail.interceptors=i1
> 12 #balance.sources.syslog_tail.interceptors.i1.type=timestamp
> 13
> 14 balance.channels.kafkaChannel.channel=kafkaChannel
> 15
> balance.channels.kafkaChannel.type=org.apache.flume.channel.kafka.KafkaChannel
> 16 balance.channels.kafkaChannel.kafka.topic=test
> 17 balance.channels.kafkaChannel.kafka.consumer.group.id=flume-consumer
> 18 balance.channels.kafkaChannel.kafka.bootstrap.servers=localhost:9092
> 19 #balance.channels.kafkaChannel.kafka.defaultPartitionId=0
> 20
> 21 balance.sinks.hdfsSink.channel=kafkaChannel
> 22 #balance.sinks.hdfsSink.type=logger
> 23 balance.sinks.hdfsSink.type = hdfs
> 24 balance.sinks.hdfsSink.hdfs.path=hdfs://localhost:9000/kafkaData/%Y-%m/%d
> 25 balance.sinks.hdfsSink.hdfs.fileType=DataStream
> 26 balance.sinks.hdfsSink.hdfs.writeFormat=Text
> 27 balance.sinks.hdfsSink.hdfs.rollSize=128000000
> 28 balance.sinks.hdfsSink.hdfs.rollCount=0
> 29 balance.sinks.hdfsSink.hdfs.round=true
> 30 balance.sinks.hdfsSInk.hdfs.rollInterval=60
> 31 balance.sinks.hdfsSink.hdfs.roundUnit=day
> 32 balance.sinks.hdfsSink.hdfs.roundValue=1
> 33 balance.sinks.hdfsSink.hdfs.threadsPoolSize=25
> 34 balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
> 35 balance.sinks.hdfsSink.hdfs.minBlockReplicas=1
> 36 balance.sinks.hdfsSink.hdfs.idleTimeout=30
> 37 #balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
> 38 balance.sinks.hdfsSink.hdfs.filePrefix=%Y-%m-%d
>
> but when I check the content in my kafka is like this
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
>
> _and I write some program get the content from kafka which is also like this_
> _?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
>
> _but when I check my hdfs the content is like this_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
> _{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
> "userid": "1234", "itemid": "abc12"}}_
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]