[ https://issues.apache.org/jira/browse/KAFKA-1180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13846920#comment-13846920 ]
Jason Rosenberg commented on KAFKA-1180: ---------------------------------------- Here's some code which reproduces the issue. Assume zkConnect points to a running zk cluster (and there's also a running kafka instance using the same zkConnect), and also that kafka is running on localhost, and using using 'metadataport': List<KeyedMessage<Integer, String>> data = ImmutableList.of( new KeyedMessage<Integer, String>("test-topic", "test-message1"), new KeyedMessage<Integer, String>("test-bad", "test-message2")), String regex = "test-(?!bad\\b)[\\w]+", Properties pProps = new Properties(); pProps.put("metadata.broker.list", "localhost:" + metadataport); pProps.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig pConfig = new ProducerConfig(pProps); Producer<Integer, String> producer = new Producer<Integer, String>(pConfig); for (KeyedMessage<Integer, String> data : toSend) { System.out.println("write"); producer.send(data); } producer.close(); Properties cProps = new Properties(); cProps.put("zookeeper.connect", zkConnect); cProps.put("group.id", "group1"); cProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString()); ConsumerConfig consumerConfig = new ConsumerConfig(cProps); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); List<KafkaStream<byte[], byte[]>> streams = consumerConnector.createMessageStreamsByFilter(new Whitelist(regex), 1); System.out.println("create streams"); > WhiteList topic filter gets a NullPointerException on complex Regex > ------------------------------------------------------------------- > > Key: KAFKA-1180 > URL: https://issues.apache.org/jira/browse/KAFKA-1180 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.0 > Reporter: Jason Rosenberg > Assignee: Neha Narkhede > > We are needing to create a stream selector that essentially combines the > logic of the BlackList and WhiteList classes (which is not easily exposed in > the high-level consumer api). That is, we want to select a topic that > contains a certain prefix, as long as it doesn't also contain a secondary > string. > This should be easy to do with ordinary java Regex's, but we're running into > some issues, trying to do this with the WhiteList class only. > We have a pattern that uses negative lookahead, like this: > "test-(?!bad\\b)[\\w]+" > So this should select a topic like: "test-good", but exclude a topic like > "test-bad", and also exclude a topic without the "test" prefix, like > "foo-bar". > Instead, what we see is a NullPointerException in the call to > createMessageStreamsByFilter (after having previously sent a message to > "test-good" followed by a message to "test-bad"): > 21700 > [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] > ERROR kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], > Error due to > kafka.common.KafkaException: error processing data for partition [test-bad,0] > offset 0 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) > at kafka.utils.Utils$.inLock(Utils.scala:565) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > Caused by: java.lang.NullPointerException > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128) > ... 9 more -- This message was sent by Atlassian JIRA (v6.1.4#6159)