[
https://issues.apache.org/jira/browse/FLUME-3074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhiqiangzhao updated FLUME-3074:
--------------------------------
Description:
When I use kafka sink to push logs from windows path to kafka server, I meet
the format error as follows
I think it is because windows path was parsed in linux style。
17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event.
Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Non integer partition id
specified
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
... 3 more
Caused by: java.lang.NumberFormatException: For input string:
"C:\Users\smart\Desktop\source\23.txt"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
... 3 more
My config text is
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
tier1.sources = s1
tier1.channels = c1
tier1.sinks = sk1
tier1.sources.s1.type = spooldir
tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source
tier1.sources.s1.fileHeader = true
tier1.sources.s1.fileHeaderKey = key
tier1.sources.s1.deserializer =
org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
tier1.sources.s1.deserializer.maxBlobLength = 2000000000
tier1.sources.s1.deserializer.maxBackoff=30000
tier1.sources.s1.channels = c1
tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 10004
tier1.channels.c1.transactionCapacity = 100
tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sk1.topic = winSink
tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
tier1.sinks.sk1.partitionIdHeader = key
tier1.sinks.sk1.channel = c1
tier1.sinks.sk1.batchSize = 20
was:
When I use kafka sink to push logs from windows path to kafka server, I meet
the format error as follows
I think it is because windows path was parsed in linux style。
17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event.
Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Non integer partition id
specified
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
... 3 more
Caused by: java.lang.NumberFormatException: For input string:
"C:\Users\smart\Desktop\source\23.txt"
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
... 3 more
My config text is
tier1.sources = s1
tier1.channels = c1
tier1.sinks = sk1
tier1.sources.s1.type = spooldir
tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source
tier1.sources.s1.fileHeader = true
tier1.sources.s1.fileHeaderKey = key
tier1.sources.s1.deserializer =
org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
tier1.sources.s1.deserializer.maxBlobLength = 2000000000
tier1.sources.s1.deserializer.maxBackoff=30000
tier1.sources.s1.channels = c1
tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 10004
tier1.channels.c1.transactionCapacity = 100
tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sk1.topic = winSink
tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
tier1.sinks.sk1.partitionIdHeader = key
tier1.sinks.sk1.channel = c1
tier1.sinks.sk1.batchSize = 20
> format error happened on windows when kafka sink is used
> --------------------------------------------------------
>
> Key: FLUME-3074
> URL: https://issues.apache.org/jira/browse/FLUME-3074
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.7.0
> Environment: windows 10
> Reporter: zhiqiangzhao
>
> When I use kafka sink to push logs from windows path to kafka server, I meet
> the format error as follows
> I think it is because windows path was parsed in linux style。
> 17 三月 2017 11:54:29,476 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver
> event. Exception follows.
> org.apache.flume.EventDeliveryException: Failed to publish events
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.EventDeliveryException: Non integer partition id
> specified
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:214)
> ... 3 more
> Caused by: java.lang.NumberFormatException: For input string:
> "C:\Users\smart\Desktop\source\23.txt"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:202)
> ... 3 more
> My config text is
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> tier1.sources = s1
> tier1.channels = c1
> tier1.sinks = sk1
>
> tier1.sources.s1.type = spooldir
> tier1.sources.s1.spoolDir = C:\\Users\\smart\\Desktop\\source
> tier1.sources.s1.fileHeader = true
> tier1.sources.s1.fileHeaderKey = key
> tier1.sources.s1.deserializer =
> org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> tier1.sources.s1.deserializer.maxBlobLength = 2000000000
> tier1.sources.s1.deserializer.maxBackoff=30000
> tier1.sources.s1.channels = c1
> tier1.channels.c1.type = memory
> tier1.channels.c1.capacity = 10004
> tier1.channels.c1.transactionCapacity = 100
> tier1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sk1.topic = winSink
> tier1.sinks.sk1.brokerList = host1:9092,host2:9092,host3:9092,host4:9092
> tier1.sinks.sk1.partitionIdHeader = key
> tier1.sinks.sk1.channel = c1
> tier1.sinks.sk1.batchSize = 20
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)