[
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15797777#comment-15797777
]
ASF GitHub Bot commented on TWILL-199:
--------------------------------------
Github user chtyim commented on a diff in the pull request:
https://github.com/apache/twill/pull/16#discussion_r94551779
--- Diff:
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0,
0).consume(new KafkaConsumer
.MessageCallback() {
@Override
- public void onReceived(Iterator<FetchedMessage> messages) {
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ long nextOffset = Long.MIN_VALUE;
+ while (messages.hasNext()) {
+ FetchedMessage message = messages.next();
+ LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+ latch.countDown();
+ }
+ return nextOffset;
+ }
+
+ @Override
+ public void finished() {
+ stopLatch.countDown();
+ }
+ });
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ cancel.cancel();
+ Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testKafkaClientReadFromIdx() throws Exception {
+ String topic = "testClient";
+
+ // Publish 30 messages with indecies the same as offsets within the
range 0 - 29
+ Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP,
"GZIP Testing message", 10);
+ t1.start();
+ t1.join();
+ Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE,
"Testing message", 10, 10);
+ t2.start();
+ t2.join();
+ Thread t3 = createPublishThread(kafkaClient, topic,
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+ t3.start();
+ t3.join();
+
+ final int startIdx = 15;
+ final CountDownLatch latch = new CountDownLatch(30 - startIdx);
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
+ // Creater a consumer
+ final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer)
kafkaClient.getConsumer();
+ Cancellable initCancel =
kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+ .MessageCallback() {
+ long minOffset = -2; // earliest msg
+ long maxOffset = -1; // latest msg
+ @Override
+ // Use binary search to find the offset of the message with the
index matching startIdx. Returns the next offset
+ // to fetch message until the matching message is found.
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ while (messages.hasNext()) {
+ FetchedMessage currentMsg = messages.next();
+ long currentOffset = currentMsg.getNextOffset() - 1;
+ String decodedMsg =
Charsets.UTF_8.decode(currentMsg.getPayload()).toString();
+ LOG.info(decodedMsg);
+ int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]);
+ LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}",
currentOffset, currentIdx, minOffset);
+ if (currentIdx == startIdx) {
+ if (offsetQueue.size() == 0) {
+ offsetQueue.offer(currentOffset);
+ LOG.info("currentOffset = {} matches startIdx {}",
currentOffset, startIdx);
+ }
+ return currentOffset;
+ }
+ // If minOffset and maxOffset still have their initial values,
set the minOffset to currentOffset and return
+ // the offset of the last received message
+ if (minOffset == -2 && maxOffset == -1) {
+ minOffset = currentOffset;
+ LOG.info("minOffset = {}, return maxOffset = {}", minOffset,
maxOffset);
+ // Returns the offset of the last received messages. Cannot
return -1 because -1 will be translated as
+ // the next offset after the last received message
+ return consumer.getLastOffset(currentMsg.getTopicPartition(),
-1) - 1;
+ }
+ if (maxOffset == -1) {
+ maxOffset = currentOffset;
+ }
+ LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset);
+ // If minOffset > maxOffset, the startIdx cannot be found in the
current range of offset.
+ // Restore the initial values of minOffset and maxOffset and
read from the beginning again
+ if (minOffset > maxOffset) {
+ minOffset = -2;
+ maxOffset = -1;
+ LOG.info("minOffset > maxOffset, return minOffset = {}",
minOffset);
+ return minOffset;
+ }
+ if (currentIdx > startIdx) {
+ maxOffset = currentOffset - 1;
+ long newOffset = minOffset + (maxOffset - minOffset)/2;
+ LOG.info("currentIdx > startIdx, return newOffset {}",
newOffset);
+ return newOffset;
+ }
+ if (currentIdx < startIdx) {
+ minOffset = currentOffset + 1;
+ long newOffset = minOffset + (maxOffset - minOffset)/2;
+ LOG.info("currentIdx < startIdx, return newOffset {}",
newOffset);
+ return newOffset;
+ }
+ }
+ return Long.MIN_VALUE;
+ }
+
+ @Override
+ public void finished() {
+ //no-op
+ }
+ });
+
+ long startOffset = offsetQueue.poll(360, TimeUnit.SECONDS);
+ initCancel.cancel();
+
+ Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0,
startOffset).consume(new KafkaConsumer
+ .MessageCallback() {
--- End diff --
This is an awkward line break.
> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> ------------------------------------------------------------------------
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
> Issue Type: Improvement
> Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator<FetchedMessage> messages)}} in
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to
> {{Long onReceived(Iterator<FetchedMessage> messages)}} so that it can provide
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on
> the error
> This method will return null for backward compatibility when it doesn't need
> to provide the next offset.
> In concrete implementation, a class of a new interface
> {{KafkaOffsetProvider}} can be added as a member in
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not
> provided, its default value will be null and none of its methods will be
> called.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)