[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15797778#comment-15797778 ]
ASF GitHub Bot commented on TWILL-199: -------------------------------------- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94551913 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override - public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages) { + long nextOffset = Long.MIN_VALUE; + while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); + } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { + String topic = "testClient"; + + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final int startIdx = 15; + final CountDownLatch latch = new CountDownLatch(30 - startIdx); + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + // Creater a consumer + final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); + Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset --- End diff -- Seems like this unit-test is unnecessarily complicated. All you want to test is that the offset being returned from the `onReceived` method is being honored, right? > Get next offset and handle offset error in KafkaConsumer.MessageCallback > ------------------------------------------------------------------------ > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator<FetchedMessage> messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)