[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15799721#comment-15799721 ]
ASF GitHub Bot commented on TWILL-199: -------------------------------------- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94688950 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override - public void onReceived(Iterator<FetchedMessage> messages) { + public long onReceived(Iterator<FetchedMessage> messages, long startOffset) { + long nextOffset = startOffset; while (messages.hasNext()) { - LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString()); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); latch.countDown(); } + return nextOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + cancel.cancel(); + Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientSkipNext() throws Exception { + String topic = "testClient"; + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + // 15 messages will be counted since onReceived returns `message.getNextOffset() + 1` as next offset to read --- End diff -- I don't think the test is correct. You published 30 messages in three message set, hence the `onReceived` method will be called three times. The first time with messages 0-9 and you return 11. The second call with 11-19, and you return 20. The last call with 21-29. So in total there will be more than 15 messages. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > ------------------------------------------------------------------------ > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator<FetchedMessage> messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)