Jason Rosenberg created KAFKA-945:
-------------------------------------
Summary: Problem with test to send a message and then consume it
Key: KAFKA-945
URL: https://issues.apache.org/jira/browse/KAFKA-945
Project: Kafka
Issue Type: Bug
Affects Versions: 0.8
Reporter: Jason Rosenberg
A simple test, which sends on message synchronously, and then consumes it, is
failing, with the latest 0.8 beta release candidate (produced from sha:
23acbd309f5e17de71db46cb6f1a60c8d38ea4e4
Note this problem did not occur with a previous version of the 0.8 branch (e.g.
it seems to work fine for sha: 988d4d8e65a14390abd748318a64e281e4a37c19).
Essentially, it appears that the message never gets sent (after complaining
about missing partition leader, etc.).
To reproduce, run the sample zookeeper and kafka scripts, that come with the
distribution (but first delete all pre-existing state by removing the data
directories that zookeeper and kafka use:
rm -rf /tmp/zookeeper
rm -rf/tmp/kafka_logs
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
Then execute the simple test code (I will attach a tarball which you should be
able to unpack and run this example).
@Test public void produceAndConsumeMessage() throws Exception {
// assumes zookeeper and kafka server are running.
String zkConnect = "localhost:2181";
int port = 9092;
Properties pProps = new Properties();
pProps.put("metadata.broker.list", "localhost:" + port);
pProps.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig pConfig = new ProducerConfig(pProps);
Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
KeyedMessage<Integer, String> data =
new KeyedMessage<Integer, String>("test-topic", "test-message");
producer.send(data);
producer.close();
Properties cProps = new Properties();
cProps.put("zookeeper.connect", zkConnect);
cProps.put("group.id", "group1");
ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test-topic",
1));
List<KafkaStream<byte[], byte[]>> streams =
topicMessageStreams.get("test-topic");
final KafkaStream<byte[], byte[]> stream = streams.get(0);
final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
// run in a separate thread
final AtomicBoolean success = new AtomicBoolean(false);
Thread consumeThread = new Thread(new Runnable() {
public void run() {
while (iter.hasNext()) {
byte[] msg = iter.next().message();
String msgStr = new String(msg);
success.set(msgStr.equals("test-message"));
break;
}
}
});
consumeThread.start();
consumeThread.join();
consumerConnector.shutdown();
assertTrue(success.get());
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira