[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802036#comment-15802036 ]
ASF GitHub Bot commented on TWILL-199: -------------------------------------- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94817038 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -189,6 +194,49 @@ public void finished() { } @Test + public void testKafkaClientSkipNext() throws Exception { + String topic = "testClient"; + // Publish 30 messages with indecies the same as offsets within the range 0 - 29 + Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); + t1.start(); + t1.join(); + Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); + t2.start(); + t2.join(); + Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); + t3.start(); + t3.join(); + + final CountDownLatch stopLatch = new CountDownLatch(1); + final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>(); + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume( + new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(long startOffset, Iterator<FetchedMessage> messages) { + if (messages.hasNext()) { + offsetQueue.offer(startOffset); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + return message.getNextOffset() + 1; + } + return startOffset; + } + + @Override + public void finished() { + stopLatch.countDown(); + } + }); + // 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read + for (int i = 0; i < 30; i += 2) { + Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS)); + } + Assert.assertEquals(0, offsetQueue.size()); --- End diff -- should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > ------------------------------------------------------------------------ > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement > Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator<FetchedMessage> messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)