Jason Rosenberg created KAFKA-945:
-------------------------------------

             Summary: Problem with test to send a message and then consume it
                 Key: KAFKA-945
                 URL: https://issues.apache.org/jira/browse/KAFKA-945
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8
            Reporter: Jason Rosenberg


A simple test, which sends on message synchronously, and then consumes it, is 
failing, with the latest 0.8 beta release candidate (produced from sha: 
23acbd309f5e17de71db46cb6f1a60c8d38ea4e4

Note this problem did not occur with a previous version of the 0.8 branch (e.g. 
it seems to work fine for sha:  988d4d8e65a14390abd748318a64e281e4a37c19).

Essentially, it appears that the message never gets sent (after complaining 
about missing partition leader, etc.).

To reproduce, run the sample zookeeper and kafka scripts, that come with the 
distribution (but first delete all pre-existing state by removing the data 
directories that zookeeper and kafka use:

rm -rf /tmp/zookeeper
rm -rf/tmp/kafka_logs
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

Then execute the simple test code (I will attach a tarball which you should be 
able to unpack and run this example).

  @Test public void produceAndConsumeMessage() throws Exception {

    // assumes zookeeper and kafka server are running.
    String zkConnect = "localhost:2181";
    int port = 9092;

    Properties pProps = new Properties();
    pProps.put("metadata.broker.list", "localhost:" + port);
    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
    ProducerConfig pConfig = new ProducerConfig(pProps);
    Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
    KeyedMessage<Integer, String> data =
        new KeyedMessage<Integer, String>("test-topic", "test-message");
    producer.send(data);
    producer.close();

    Properties cProps = new Properties();
    cProps.put("zookeeper.connect", zkConnect);
    cProps.put("group.id", "group1");
    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
    ConsumerConnector consumerConnector = 
Consumer.createJavaConsumerConnector(consumerConfig);

    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
        consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 
1));
    List<KafkaStream<byte[], byte[]>> streams = 
topicMessageStreams.get("test-topic");
    final KafkaStream<byte[], byte[]> stream = streams.get(0);
    final ConsumerIterator<byte[], byte[]> iter = stream.iterator();

    // run in a separate thread
    final AtomicBoolean success = new AtomicBoolean(false);
    Thread consumeThread = new Thread(new Runnable() {
      public void run() {
        while (iter.hasNext()) {
          byte[] msg = iter.next().message();
          String msgStr = new String(msg);
          success.set(msgStr.equals("test-message"));
          break;
        }
      }
    });

    consumeThread.start();
    consumeThread.join();

    consumerConnector.shutdown();
    assertTrue(success.get());
  }


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to