[
https://issues.apache.org/jira/browse/KAFKA-1180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Rosenberg updated KAFKA-1180:
-----------------------------------
Description:
We are needing to create a stream selector that essentially combines the logic
of the BlackList and WhiteList classes (which is not easily exposed in the
high-level consumer api). That is, we want to select a topic that contains a
certain prefix, as long as it doesn't also contain a secondary string.
This should be easy to do with ordinary java Regex's, but we're running into
some issues, trying to do this with the WhiteList class only.
We have a pattern that uses negative lookahead, like this:
"test-(?!bad\\b)[\\w]+"
So this should select a topic like: "test-good", but exclude a topic like
"test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".
Instead, what we see is a NullPointerException in the call to
createMessageStreamsByFilter:
21700
[ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683]
ERROR kafka.consumer.ConsumerFetcherThread -
[ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683],
Error due to
kafka.common.KafkaException: error processing data for partition [test-bad,0]
offset 0
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:565)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Caused by: java.lang.NullPointerException
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
... 9 more
was:
We are needing to create a stream selector that essentially combines the logic
of the BlackList and WhiteList classes (which is not easily exposed in the
high-level consumer api). That is, we want to select a topic that contains a
certain prefix, as long as it doesn't also contain a secondary string.
This should be easy to do with ordinary java Regex's, but we're running into
some issues, trying to do this with the WhiteList class only.
We have a pattern that uses negative lookahead, like this:
"test-(?!bad\\b)[\\w]+"
So this should select a topic like: "test-good", but exclude a topic like
"test-bad", and also exclude a topic without the "test" prefix, like "foo-bar".
Instead, what we see is a NullPointerException in the ConsumerIterator, and the
consumer just hangs, if we send a message like "test-topic" followed by
"test-bad":
21700
[ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683]
ERROR kafka.consumer.ConsumerFetcherThread -
[ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683],
Error due to
kafka.common.KafkaException: error processing data for partition [test-bad,0]
offset 0
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:565)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Caused by: java.lang.NullPointerException
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
... 9 more
> WhiteList topic filter gets a NullPointerException on complex Regex
> -------------------------------------------------------------------
>
> Key: KAFKA-1180
> URL: https://issues.apache.org/jira/browse/KAFKA-1180
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.0
> Reporter: Jason Rosenberg
> Assignee: Neha Narkhede
>
> We are needing to create a stream selector that essentially combines the
> logic of the BlackList and WhiteList classes (which is not easily exposed in
> the high-level consumer api). That is, we want to select a topic that
> contains a certain prefix, as long as it doesn't also contain a secondary
> string.
> This should be easy to do with ordinary java Regex's, but we're running into
> some issues, trying to do this with the WhiteList class only.
> We have a pattern that uses negative lookahead, like this:
> "test-(?!bad\\b)[\\w]+"
> So this should select a topic like: "test-good", but exclude a topic like
> "test-bad", and also exclude a topic without the "test" prefix, like
> "foo-bar".
> Instead, what we see is a NullPointerException in the call to
> createMessageStreamsByFilter:
> 21700
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683]
> ERROR kafka.consumer.ConsumerFetcherThread -
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683],
> Error due to
> kafka.common.KafkaException: error processing data for partition [test-bad,0]
> offset 0
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> at kafka.utils.Utils$.inLock(Utils.scala:565)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> Caused by: java.lang.NullPointerException
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
> ... 9 more
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)