[
https://issues.apache.org/jira/browse/KAFKA-945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Rosenberg resolved KAFKA-945.
-----------------------------------
Resolution: Invalid
> Problem with test to send a message and then consume it
> -------------------------------------------------------
>
> Key: KAFKA-945
> URL: https://issues.apache.org/jira/browse/KAFKA-945
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Jason Rosenberg
> Attachments: kafak-945.out, kafka-945.tar.gz
>
>
> A simple test, which sends on message synchronously, and then consumes it, is
> failing, with the latest 0.8 beta release candidate (produced from sha:
> 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4
> Note this problem did not occur with a previous version of the 0.8 branch
> (e.g. it seems to work fine for sha:
> 988d4d8e65a14390abd748318a64e281e4a37c19).
> Essentially, it appears that the message never gets sent (after complaining
> about missing partition leader, etc.).
> To reproduce, run the sample zookeeper and kafka scripts, that come with the
> distribution (but first delete all pre-existing state by removing the data
> directories that zookeeper and kafka use:
> rm -rf /tmp/zookeeper
> rm -rf/tmp/kafka_logs
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> Then execute the simple test code (I will attach a tarball which you should
> be able to unpack and run this example).
> @Test public void produceAndConsumeMessage() throws Exception {
> // assumes zookeeper and kafka server are running.
> String zkConnect = "localhost:2181";
> int port = 9092;
> Properties pProps = new Properties();
> pProps.put("metadata.broker.list", "localhost:" + port);
> pProps.put("serializer.class", "kafka.serializer.StringEncoder");
> ProducerConfig pConfig = new ProducerConfig(pProps);
> Producer<Integer, String> producer = new Producer<Integer,
> String>(pConfig);
> KeyedMessage<Integer, String> data =
> new KeyedMessage<Integer, String>("test-topic", "test-message");
> producer.send(data);
> producer.close();
> Properties cProps = new Properties();
> cProps.put("zookeeper.connect", zkConnect);
> cProps.put("group.id", "group1");
> ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
> ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(consumerConfig);
> Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
> consumerConnector.createMessageStreams(ImmutableMap.of("test-topic",
> 1));
> List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("test-topic");
> final KafkaStream<byte[], byte[]> stream = streams.get(0);
> final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
> // run in a separate thread
> final AtomicBoolean success = new AtomicBoolean(false);
> Thread consumeThread = new Thread(new Runnable() {
> public void run() {
> while (iter.hasNext()) {
> byte[] msg = iter.next().message();
> String msgStr = new String(msg);
> success.set(msgStr.equals("test-message"));
> break;
> }
> }
> });
> consumeThread.start();
> consumeThread.join();
> consumerConnector.shutdown();
> assertTrue(success.get());
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira