[
https://issues.apache.org/jira/browse/FLUME-3358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
huaicui updated FLUME-3358:
---------------------------
Description:
I have a requirement to stream and filter kafka topics according to the
business. Because the filter will cause list<Event> to be empty, the whole
pipeline will not work properly
logic like this:
String key = JsonPath.read(message, "$.key");
switch (key)
{ case "test1": return process(key, event); case "test2": return process(key,
event); case "test3": return process(key, event); default: return null; }
When all data of a queue will be filtered, this pipeline(kafka->hdfs) will
stay in an abnormal state.
This is my configuration:
# device flume
Test.sources = r1
Test.sinks = k1
Test.channels = c1
Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
Test.sources.r1.kafka.bootstrap.servers = xxxx
Test.sources.r1.topic = xxx
Test.sources.r1.groupId = test_lab_1
Test.sources.r1.kafka.consumer.timeout.ms = 100
Test.sources.r1.interceptors = i1
Test.sources.r1.interceptors.i1.type =
com.goe.DeviceUsageDeserializerInterceptor$Builder. — This is my custom
interceptor
# Describe the sink
Test.sinks.k1.type = hdfs
Test.sinks.k1.hdfs.path = /user/naming/%\{DeviceDir}
Test.sinks.k1.hdfs.filePrefix = device-
Test.sinks.k1.hdfs.fileSuffix = .csv
Test.sinks.k1.hdfs.inUseSuffix = .tmp
Test.sinks.k1.hdfs.idleTimeout = 120
Test.sinks.k1.hdfs.writeFormat = Text
Test.sinks.k1.hdfs.batchSize = 100
Test.sinks.k1.hdfs.threadsPoolSize = 10
Test.sinks.k1.hdfs.rollSize = 0
Test.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
Test.channels.c1.type = memory
Test.channels.c1.capacity = 10000
Test.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
Test.sources.r1.channels = c1
Test.sinks.k1.channel = c1
this is exception:
!image-2020-03-04-19-13-16-068.png!
was:
I have a requirement to stream and filter kafka topics according to the
business. Because the filter will cause list<Event> to be empty, the whole
pipeline will not work properly
logic like this:
String key = JsonPath.read(message, "$.key");
switch (key) {
case "test1":
return process(key, event);
case "test2":
return process(key, event);
case "test3":
return process(key, event);
default:
return null;
}
When all data of a queue will be filtered, this pipeline(kafka->hdfs) will
stay in an abnormal state.
This is my configuration:
# device flume
Test.sources = r1
Test.sinks = k1
Test.channels = c1
Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
Test.sources.r1.kafka.bootstrap.servers = xxxx
Test.sources.r1.topic = xxx
Test.sources.r1.groupId = test_lab_1
Test.sources.r1.kafka.consumer.timeout.ms = 100
Test.sources.r1.interceptors = i1
Test.sources.r1.interceptors.i1.type =
com.cisco.wap.DeviceUsageDeserializerInterceptor$Builder. --- This is my custom
interceptor
# Describe the sink
Test.sinks.k1.type = hdfs
Test.sinks.k1.hdfs.path = /user/naming/%\{DeviceDir}
Test.sinks.k1.hdfs.filePrefix = device-
Test.sinks.k1.hdfs.fileSuffix = .csv
Test.sinks.k1.hdfs.inUseSuffix = .tmp
Test.sinks.k1.hdfs.idleTimeout = 120
Test.sinks.k1.hdfs.writeFormat = Text
Test.sinks.k1.hdfs.batchSize = 100
Test.sinks.k1.hdfs.threadsPoolSize = 10
Test.sinks.k1.hdfs.rollSize = 0
Test.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
Test.channels.c1.type = memory
Test.channels.c1.capacity = 10000
Test.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
Test.sources.r1.channels = c1
Test.sinks.k1.channel = c1
this is exception:
!image-2020-03-04-19-13-16-068.png!
> The kafka channel does not work properly when the interceptor filters into an
> empty event queue
> ------------------------------------------------------------------------------------------------
>
> Key: FLUME-3358
> URL: https://issues.apache.org/jira/browse/FLUME-3358
> Project: Flume
> Issue Type: Bug
> Components: Kafka Channel
> Affects Versions: 1.9.0
> Reporter: huaicui
> Priority: Blocker
> Attachments: exception.jpg
>
>
> I have a requirement to stream and filter kafka topics according to the
> business. Because the filter will cause list<Event> to be empty, the whole
> pipeline will not work properly
> logic like this:
> String key = JsonPath.read(message, "$.key");
> switch (key)
> { case "test1": return process(key, event); case "test2": return process(key,
> event); case "test3": return process(key, event); default: return null; }
> When all data of a queue will be filtered, this pipeline(kafka->hdfs) will
> stay in an abnormal state.
>
> This is my configuration:
> # device flume
> Test.sources = r1
> Test.sinks = k1
> Test.channels = c1
> Test.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
> Test.sources.r1.kafka.bootstrap.servers = xxxx
> Test.sources.r1.topic = xxx
> Test.sources.r1.groupId = test_lab_1
> Test.sources.r1.kafka.consumer.timeout.ms = 100
> Test.sources.r1.interceptors = i1
> Test.sources.r1.interceptors.i1.type =
> com.goe.DeviceUsageDeserializerInterceptor$Builder. — This is my custom
> interceptor
> # Describe the sink
> Test.sinks.k1.type = hdfs
> Test.sinks.k1.hdfs.path = /user/naming/%\{DeviceDir}
> Test.sinks.k1.hdfs.filePrefix = device-
> Test.sinks.k1.hdfs.fileSuffix = .csv
> Test.sinks.k1.hdfs.inUseSuffix = .tmp
> Test.sinks.k1.hdfs.idleTimeout = 120
> Test.sinks.k1.hdfs.writeFormat = Text
> Test.sinks.k1.hdfs.batchSize = 100
> Test.sinks.k1.hdfs.threadsPoolSize = 10
> Test.sinks.k1.hdfs.rollSize = 0
> Test.sinks.k1.hdfs.rollCount = 0
> # Use a channel which buffers events in memory
> Test.channels.c1.type = memory
> Test.channels.c1.capacity = 10000
> Test.channels.c1.transactionCapacity = 1000
> # Bind the source and sink to the channel
> Test.sources.r1.channels = c1
> Test.sinks.k1.channel = c1
> this is exception:
> !image-2020-03-04-19-13-16-068.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]