[ 
https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284848#comment-16284848
 ] 

Narendra Kumar commented on KAFKA-6339:
---------------------------------------

Hi Dhruv, Can you please let us know more about the problem you are facing?

> Integration test with embedded kafka.
> -------------------------------------
>
>                 Key: KAFKA-6339
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6339
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.2
>            Reporter: DHRUV BANSAL
>
> I am using Kafka version - 0.11.0.2
> Trying to write an integration test for one of the components I am writing 
> over Kafka.
> Following code works fine with Kafka version 0.10.0.0 but not working with 
> mentioned version (0.11.0.2)
> {{
> // setup Zookeeper
>               EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
>               String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
>               ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, 
> ZKStringSerializer$.MODULE$);
>               ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
>               // setup Broker
>               Properties brokerProps = new Properties();
>               brokerProps.setProperty("zookeeper.connect", zkConnect);
>               brokerProps.setProperty("broker.id", "0");
>               brokerProps.setProperty("offsets.topic.replication.factor", 
> "1");
>               String kafka_log_path = 
> Files.createTempDirectory("kafka-").toAbsolutePath().toString();
>               System.out.println("kafka log path " + kafka_log_path);
>               brokerProps.setProperty("log.dirs", kafka_log_path);
>               brokerProps.setProperty("listeners", "PLAINTEXT://" + 
> BROKERHOST + ":" + BROKERPORT);
>               KafkaConfig config = new KafkaConfig(brokerProps);
>               Time mock = new MockTime();
>               KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>               // create topic
>               AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
> RackAwareMode.Disabled$.MODULE$);
>               // setup producer
>               Properties producerProps = new Properties();
>               producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
> + BROKERPORT);
>               producerProps.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.IntegerSerializer");
>               producerProps.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>               KafkaProducer<Integer, byte[]> producer = new 
> KafkaProducer<Integer, byte[]>(producerProps);
>               // setup consumer
>               Properties consumerProps = new Properties();
>               consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
> + BROKERPORT);
>               consumerProps.setProperty("group.id", "group0");
>               consumerProps.setProperty("client.id", "consumer0");
>               consumerProps.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.IntegerDeserializer");
>               consumerProps.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
>               consumerProps.put("auto.offset.reset", "earliest"); // to make 
> sure the consumer starts from the beginning of
>                                                                               
>                                         // the topic
>               KafkaConsumer<Integer, byte[]> consumer = new 
> KafkaConsumer<>(consumerProps);
>               consumer.subscribe(Arrays.asList(TOPIC));
>               // send message
>               ProducerRecord<Integer, byte[]> data = new 
> ProducerRecord<>(TOPIC, 42,
>                               
> "test-message".getBytes(StandardCharsets.UTF_8));
>               Future<RecordMetadata> record = producer.send(data);
>               RecordMetadata metadata = record.get();
>               // starting consumer
>               ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
>               assertEquals(1, records.count());
>               Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
> records.iterator();
>               ConsumerRecord<Integer, byte[]> consumedRecord = 
> recordIterator.next();
>               System.out.printf("offset = %d, key = %s, value = %s", 
> consumedRecord.offset(), consumedRecord.key(),
>                               consumedRecord.value());
>               assertEquals(42, (int) consumedRecord.key());
>               assertEquals("test-message", new String(consumedRecord.value(), 
> StandardCharsets.UTF_8));
>               kafkaServer.shutdown();
>               zkClient.close();
>               embeddedZookeeper.shutdown();
> }}
> Please provide support for the same and there should be proper documentation 
> for the intergration test with each release. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to