LL created FLUME-3236:
-------------------------
Summary: use kafka channel but the content in kafka is abnormal
Key: FLUME-3236
URL: https://issues.apache.org/jira/browse/FLUME-3236
Project: Flume
Issue Type: Bug
Reporter: LL
I use mac base environment, use kafka 2.11-1.1.0 and flume 1.8
I write a client by flume sdk on another tablet like
MyRpcClientFacade client = new MyRpcClientFacade();
client.init();
List<Event> buffer = new ArrayList<Event>();
String sampleDataitem = "\{\"cmd\":\"add\", \"fields\": {\"itemid\":
\"28394556\", \"cateid\": \"9_2_1\", \"score\": 459, \"title\": \"天穿修炼记最新版\",
\"item_tags\": \"修仙\"}}";
String sampleDatauser = "\{\"cmd\":\"add\",
\"fields\":{\"timestamp\":1489040079, \"action_type\":\"view\", \"userid\":
\"1234\", \"itemid\": \"abc12\"}}";
Event event;
for (int i = 1; i < 200000; i++) {
event = EventBuilder.withBody(sampleDatauser, Charset.forName("UTF-8"));
buffer.add(event);
if (i % 100 ==0) {
client.sendDataToFlume(buffer);
buffer.clear();
}
}
and my flume conf is
1 balance.sources=syslog_tail
2 balance.sinks=hdfsSink
3 balance.channels=kafkaChannel
4
5 balance.sources.syslog_tail.type=avro
6 balance.sources.syslog_tail.threads=20
7 balance.sources.syslog_tail.bind=192.168.0.104
8 balance.sources.syslog_tail.port=44444
9 balance.sources.syslog_tail.channels=kafkaChannel
10
11 #balance.sources.syslog_tail.interceptors=i1
12 #balance.sources.syslog_tail.interceptors.i1.type=timestamp
13
14 balance.channels.kafkaChannel.channel=kafkaChannel
15
balance.channels.kafkaChannel.type=org.apache.flume.channel.kafka.KafkaChannel
16 balance.channels.kafkaChannel.kafka.topic=test
17 balance.channels.kafkaChannel.kafka.consumer.group.id=flume-consumer
18 balance.channels.kafkaChannel.kafka.bootstrap.servers=localhost:9092
19 #balance.channels.kafkaChannel.kafka.defaultPartitionId=0
20
21 balance.sinks.hdfsSink.channel=kafkaChannel
22 #balance.sinks.hdfsSink.type=logger
23 balance.sinks.hdfsSink.type = hdfs
24 balance.sinks.hdfsSink.hdfs.path=hdfs://localhost:9000/kafkaData/%Y-%m/%d
25 balance.sinks.hdfsSink.hdfs.fileType=DataStream
26 balance.sinks.hdfsSink.hdfs.writeFormat=Text
27 balance.sinks.hdfsSink.hdfs.rollSize=128000000
28 balance.sinks.hdfsSink.hdfs.rollCount=0
29 balance.sinks.hdfsSink.hdfs.round=true
30 balance.sinks.hdfsSInk.hdfs.rollInterval=60
31 balance.sinks.hdfsSink.hdfs.roundUnit=day
32 balance.sinks.hdfsSink.hdfs.roundValue=1
33 balance.sinks.hdfsSink.hdfs.threadsPoolSize=25
34 balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
35 balance.sinks.hdfsSink.hdfs.minBlockReplicas=1
36 balance.sinks.hdfsSink.hdfs.idleTimeout=30
37 #balance.sinks.hdfsSink.hdfs.useLocalTimeStamp=true
38 balance.sinks.hdfsSink.hdfs.filePrefix=%Y-%m-%d
but when I check the content in my kafka is like this
_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_and I write some program get the content from kafka which is also like this_
_?\{"cmd":"add", "fields":{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_but when I check my hdfs the content is like this_
_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
_{"cmd":"add", "fields":\{"timestamp":1489040079, "action_type":"view",
"userid": "1234", "itemid": "abc12"}}_
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]