[ 
https://issues.apache.org/jira/browse/KAFKA-1180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13846920#comment-13846920
 ] 

Jason Rosenberg commented on KAFKA-1180:
----------------------------------------

Here's some code which reproduces the issue.  Assume zkConnect points to a 
running zk cluster (and there's also a running kafka instance using the same 
zkConnect), and also that kafka is running  on localhost, and using using 
'metadataport':

    List<KeyedMessage<Integer, String>> data =  ImmutableList.of(
            new KeyedMessage<Integer, String>("test-topic", "test-message1"),
            new KeyedMessage<Integer, String>("test-bad", "test-message2")),
    String regex =  "test-(?!bad\\b)[\\w]+",

    Properties pProps = new Properties();
    pProps.put("metadata.broker.list", "localhost:" + metadataport);
    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
    ProducerConfig pConfig = new ProducerConfig(pProps);
    Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
    for (KeyedMessage<Integer, String> data : toSend) {
      System.out.println("write");
      producer.send(data);
    }
    producer.close();

    Properties cProps = new Properties();
    cProps.put("zookeeper.connect", zkConnect);
    cProps.put("group.id", "group1");
    cProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString());
    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
    ConsumerConnector consumerConnector = 
Consumer.createJavaConsumerConnector(consumerConfig);

    List<KafkaStream<byte[], byte[]>> streams =
        consumerConnector.createMessageStreamsByFilter(new Whitelist(regex), 1);
    System.out.println("create streams");



> WhiteList topic filter gets a NullPointerException on complex Regex
> -------------------------------------------------------------------
>
>                 Key: KAFKA-1180
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1180
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.0
>            Reporter: Jason Rosenberg
>            Assignee: Neha Narkhede
>
> We are needing to create a stream selector that essentially combines the 
> logic of the BlackList and WhiteList classes (which is not easily exposed in 
> the high-level consumer api).  That is, we want to select a topic that 
> contains a certain prefix, as long as it doesn't also contain a secondary 
> string.
> This should be easy to do with ordinary java Regex's, but we're running into 
> some issues, trying to do this with the WhiteList class only.
> We have a pattern that uses negative lookahead, like this:
> "test-(?!bad\\b)[\\w]+"
> So this should select a topic like: "test-good", but exclude a topic like 
> "test-bad", and also exclude a topic without the "test" prefix, like 
> "foo-bar".
> Instead, what we see is a NullPointerException in the call to 
> createMessageStreamsByFilter (after having previously sent a message to 
> "test-good" followed by a message to "test-bad"):
> 21700 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683]
>  ERROR kafka.consumer.ConsumerFetcherThread  - 
> [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683],
>  Error due to 
> kafka.common.KafkaException: error processing data for partition [test-bad,0] 
> offset 0
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
>       at kafka.utils.Utils$.inLock(Utils.scala:565)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> Caused by: java.lang.NullPointerException
>       at 
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>       at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
>       ... 9 more



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to