[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799721#comment-15799721
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94688950
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages, long 
startOffset) {
+long nextOffset = startOffset;
 while (messages.hasNext()) {
-  
LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
   latch.countDown();
 }
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientSkipNext() throws Exception {
+String topic = "testClient";
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+// 15 messages will be counted since onReceived returns 
`message.getNextOffset() + 1` as next offset to read
--- End diff --

I don't think the test is correct. You published 30 messages in three 
message set, hence the `onReceived` method will be called three times. The 
first time with messages 0-9 and you return 11. The second call with 11-19, and 
you return 20. The last call with 21-29. So in total there will be more than 15 
messages.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799722#comment-15799722
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94688058
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages, 
final long startOffset) {
   if (stopped.get()) {
-return;
+return startOffset;
   }
-  Futures.getUnchecked(executor.submit(new Runnable() {
+  return Futures.getUnchecked(executor.submit(new Callable() 
{
+long nextOffset = startOffset;
--- End diff --

You don't need this local variable. If the consumer is stopped, it always 
returns the `startOffset` inside the callable. Otherwise, the callable returns 
whatever returned by the `callback` delegate.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94688950
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages, long 
startOffset) {
+long nextOffset = startOffset;
 while (messages.hasNext()) {
-  
LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
   latch.countDown();
 }
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientSkipNext() throws Exception {
+String topic = "testClient";
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+// 15 messages will be counted since onReceived returns 
`message.getNextOffset() + 1` as next offset to read
--- End diff --

I don't think the test is correct. You published 30 messages in three 
message set, hence the `onReceived` method will be called three times. The 
first time with messages 0-9 and you return 11. The second call with 11-19, and 
you return 20. The last call with 21-29. So in total there will be more than 15 
messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94688058
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages, 
final long startOffset) {
   if (stopped.get()) {
-return;
+return startOffset;
   }
-  Futures.getUnchecked(executor.submit(new Runnable() {
+  return Futures.getUnchecked(executor.submit(new Callable() 
{
+long nextOffset = startOffset;
--- End diff --

You don't need this local variable. If the consumer is stopped, it always 
returns the `startOffset` inside the callable. Otherwise, the callable returns 
whatever returned by the `callback` delegate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94687750
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -35,12 +35,14 @@
  * Invoked when new messages is available.
  * @param messages Iterator of new messages. The {@link 
FetchedMessage} instance maybe reused in the Iterator
  * and across different invocation.
+ * @param startOffset Offset of the current message to be consumed.
+ * @return The offset of the message to be fetched next.
  */
-void onReceived(Iterator messages);
+long onReceived(Iterator messages, long startOffset);
--- End diff --

Can you revert the ordering of the parameters? Logically it fetches from 
the `startOffset` to produce the `Iterator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15799233#comment-15799233
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94658662
  
--- Diff: 
twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java ---
@@ -32,7 +32,7 @@
   Cancellable announce(String serviceName, int port);
 
   /**
-   * Registers an endpoint that could be discovered by external party with 
a payload
+   * Registers an endpoint that could be discovered by external party with 
a payload.
--- End diff --

checkstyle fix


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797904#comment-15797904
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94561460
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
   }
-  Futures.getUnchecked(executor.submit(new Runnable() {
+  return Futures.getUnchecked(executor.submit(new Callable() 
{
+long nextOffset = Long.MIN_VALUE;
 @Override
-public void run() {
+public Long call() {
   if (stopped.get()) {
-return;
+return nextOffset;
   }
-  callback.onReceived(messages);
+  nextOffset = callback.onReceived(messages);
--- End diff --

I was assuming that this `onReceived` can be called multiple times. 
Therefore, at line 286 when stopped is set to true, messages processed in 
previous calls of `onReceived` should be skipped.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797901#comment-15797901
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94561337
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Using AtomicLong as a mutable parameter is not good. In general, 
immutability gives cleaner API


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94561337
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Using AtomicLong as a mutable parameter is not good. In general, 
immutability gives cleaner API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797882#comment-15797882
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94559673
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages) {
+long nextOffset = Long.MIN_VALUE;
+while (messages.hasNext()) {
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  latch.countDown();
+}
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientReadFromIdx() throws Exception {
+String topic = "testClient";
+
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final int startIdx = 15;
+final CountDownLatch latch = new CountDownLatch(30 - startIdx);
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+// Creater a consumer
+final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) 
kafkaClient.getConsumer();
+Cancellable initCancel = 
kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+  .MessageCallback() {
+  long minOffset = -2; // earliest msg
+  long maxOffset = -1; // latest msg
+  @Override
+  // Use binary search to find the offset of the message with the 
index matching startIdx. Returns the next offset
--- End diff --

Right, since the offsets and indices of the messages are known, it can 
directly jump to the desired idx without binary search.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94559673
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages) {
+long nextOffset = Long.MIN_VALUE;
+while (messages.hasNext()) {
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  latch.countDown();
+}
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientReadFromIdx() throws Exception {
+String topic = "testClient";
+
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final int startIdx = 15;
+final CountDownLatch latch = new CountDownLatch(30 - startIdx);
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+// Creater a consumer
+final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) 
kafkaClient.getConsumer();
+Cancellable initCancel = 
kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+  .MessageCallback() {
+  long minOffset = -2; // earliest msg
+  long maxOffset = -1; // latest msg
+  @Override
+  // Use binary search to find the offset of the message with the 
index matching startIdx. Returns the next offset
--- End diff --

Right, since the offsets and indices of the messages are known, it can 
directly jump to the desired idx without binary search.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797867#comment-15797867
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94558645
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

Yes, it's an implementation detail. Just wondering for the future work.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94558645
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

Yes, it's an implementation detail. Just wondering for the future work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797861#comment-15797861
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94558457
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Maybe to pass an `AtomicLong` to this method, and it contains the initial 
offset and can be set to the next offset? Just realized this is probably an 
implementation detail in `SimpleKafkaConsumer` and is not general enough to be 
done in the interface.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94558457
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Maybe to pass an `AtomicLong` to this method, and it contains the initial 
offset and can be set to the next offset? Just realized this is probably an 
implementation detail in `SimpleKafkaConsumer` and is not general enough to be 
done in the interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797849#comment-15797849
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94557480
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Why? The offset passed to this method is the offset being used for 
fetching, hence the creation of the iterator. The offset returned, on the other 
hand governs the offset to use for the next fetch


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94557551
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

It's independent of this PR, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94557480
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Why? The offset passed to this method is the offset being used for 
fetching, hence the creation of the iterator. The offset returned, on the other 
hand governs the offset to use for the next fetch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797840#comment-15797840
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94556442
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

If the offset is passed to `onReceived`, it seems that there's no need to 
return an offset? Because now the returned offset is used to set the offset 
outside of the `onReceived` method.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797824#comment-15797824
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94555234
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

Right, this can be done in the unit test. To phrase my question better, 
what can be a better way to perform the same logic as `getLastOffset` outside 
of `SimpleKafkaConsumer`? Maybe expose the parameter passed to 
`SimpleKafkaConsumer` constructor in `ZKKafkaClientService`?


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94551779
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages) {
+long nextOffset = Long.MIN_VALUE;
+while (messages.hasNext()) {
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  latch.countDown();
+}
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientReadFromIdx() throws Exception {
+String topic = "testClient";
+
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final int startIdx = 15;
+final CountDownLatch latch = new CountDownLatch(30 - startIdx);
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+// Creater a consumer
+final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) 
kafkaClient.getConsumer();
+Cancellable initCancel = 
kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+  .MessageCallback() {
+  long minOffset = -2; // earliest msg
+  long maxOffset = -1; // latest msg
+  @Override
+  // Use binary search to find the offset of the message with the 
index matching startIdx. Returns the next offset
+  // to fetch message until the matching message is found.
+  public long onReceived(Iterator messages) {
+while (messages.hasNext()) {
+  FetchedMessage currentMsg = messages.next();
+  long currentOffset = currentMsg.getNextOffset() - 1;
+  String decodedMsg = 
Charsets.UTF_8.decode(currentMsg.getPayload()).toString();
+  LOG.info(decodedMsg);
+  int currentIdx = Integer.valueOf(decodedMsg.split(" ")[0]);
+  LOG.info("Current offset = {}, currentIdx = {}. minOffset = {}", 
currentOffset, currentIdx, minOffset);
+  if (currentIdx == startIdx) {
+if (offsetQueue.size() == 0) {
+  offsetQueue.offer(currentOffset);
+  LOG.info("currentOffset = {} matches startIdx {}", 
currentOffset, startIdx);
+}
+return currentOffset;
+  }
+  // If minOffset and maxOffset still have their initial values, 
set the minOffset to currentOffset and return
+  // the offset of the last received message
+  if (minOffset == -2 && maxOffset == -1) {
+minOffset = currentOffset;
+LOG.info("minOffset = {}, return maxOffset = {}", minOffset, 
maxOffset);
+// Returns the offset of the last received messages. Cannot 
return -1 because -1 will be translated as
+// the next offset after the last received message
+return consumer.getLastOffset(currentMsg.getTopicPartition(), 
-1) - 1;
+  }
+  if (maxOffset == -1) {
+maxOffset = currentOffset;
+  }
+  LOG.info("minOffset = {}, maxOffset = {}", minOffset, maxOffset);
+  // If minOffset > maxOffset, the startIdx cannot be found in the 
current range of offset.
+  // Restore the initial values of minOffset and maxOffset and 
read from the beginning again
+  if (minOffset > maxOffset) {
+minOffset = -2;
+maxOffset = -1;
+LOG.info("minOffset > maxOffset, return minOffset = {}", 
minOffset);
+return minOffset;
+  }
+  if (currentIdx > startIdx) {
+maxOffset = currentOffset - 1;
+long newOffset = 

[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797757#comment-15797757
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94551508
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Using special value is an anti-pattern. Since you are changing the 
`MessageCallback` API already, probably better to have the offset being used 
for the fetching call being passed to the `onReceived` method.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15797752#comment-15797752
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94551352
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

That's not a good reason to turn this class to public. Why the unit test 
have to use this method? Can't the unit test just return some offset that it 
know of?


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)