[jira] [Commented] (TWILL-63) Speed up application launch time

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15803888#comment-15803888
 ] 

ASF GitHub Bot commented on TWILL-63:
-

GitHub user chtyim opened a pull request:

https://github.com/apache/twill/pull/21

(TWILL-63) Speed up application launch time

The general approach is better jar files management and to cache and reuse 
jar files created through
class dependency tracing. The changes are further broken down as follows:

1. Refactor jars generation
  - One jar containing the TwillLauncher (launcher.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing all twill classes (twill.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing the application class, created through dependency 
tracing.
- This jar is generated based on the application being launched. It is 
reusable when launching the same app multiple times.
  - One jar containing user resources setup through TwillPreparer.
- This jar is not reused between apps.
  - One jar containing runtime config needed by Twill
- logback.xml, jvm opts, environment, classpaths, ... etc
2. Let YARN to expand jars when localizing to containers instead of 
expanding it programatically
  - This save time in jar expansion when multiple containers are running on 
the same host
3. Introduce a new configuration "twill.location.cache.dir" to enable jar 
caching and reuse
  - Currently only the launcher.jar, twill.jar and application jar will be 
cached and reuse when possible
  - Cache cleanup logic is also in place to remove files in cache directory 
that is no longer used by application
4. The ApplicationBundler is improved to allow more flexible usage

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chtyim/twill feature/twill-63

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/twill/pull/21.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21


commit 786f3c6e075c15d929b43e322f8a869963a95b81
Author: Terence Yim 
Date:   2016-12-07T01:05:11Z

(TWILL-63) Speed up application launch time

The general approach is better jar files management and to cache and reuse 
jar files created through
class dependency tracing. The changes are further broken down as follows:

1. Refactor jars generation
  - One jar containing the TwillLauncher (launcher.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing all twill classes (twill.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing the application class, created through dependency 
tracing.
- This jar is generated based on the application being launched. It is 
reusable when launching the same app multiple times.
  - One jar containing user resources setup through TwillPreparer.
- This jar is not reused between apps.
  - One jar containing runtime config needed by Twill
- logback.xml, jvm opts, environment, classpaths, ... etc
2. Let YARN to expand jars when localizing to containers instead of 
expanding it programatically
  - This save time in jar expansion when multiple containers are running on 
the same host
3. Introduce a new configuration "twill.location.cache.dir" to enable jar 
caching and reuse
  - Currently only the launcher.jar, twill.jar and application jar will be 
cached and reuse when possible
  - Cache cleanup logic is also in place to remove files in cache directory 
that is no longer used by application
4. The ApplicationBundler is improved to allow more flexible usage




> Speed up application launch time
> 
>
> Key: TWILL-63
> URL: https://issues.apache.org/jira/browse/TWILL-63
> Project: Apache Twill
>  Issue Type: Improvement
>  Components: yarn
>Affects Versions: 0.2.0-incubating
>Reporter: Terence Yim
>Assignee: Terence Yim
> Fix For: 0.10.0
>
>
> Currently when launching an application, two new jars are always created 
> locally, one for AM (appMaster.jar) and one for Container (container.jar) and 
> copied to HDFS before submitting the application. The jar files could 
> potentially be big and if it doesn't changed, it should require copying to 
> HDFS again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #21: (TWILL-63) Speed up application launch time

2017-01-05 Thread chtyim
GitHub user chtyim opened a pull request:

https://github.com/apache/twill/pull/21

(TWILL-63) Speed up application launch time

The general approach is better jar files management and to cache and reuse 
jar files created through
class dependency tracing. The changes are further broken down as follows:

1. Refactor jars generation
  - One jar containing the TwillLauncher (launcher.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing all twill classes (twill.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing the application class, created through dependency 
tracing.
- This jar is generated based on the application being launched. It is 
reusable when launching the same app multiple times.
  - One jar containing user resources setup through TwillPreparer.
- This jar is not reused between apps.
  - One jar containing runtime config needed by Twill
- logback.xml, jvm opts, environment, classpaths, ... etc
2. Let YARN to expand jars when localizing to containers instead of 
expanding it programatically
  - This save time in jar expansion when multiple containers are running on 
the same host
3. Introduce a new configuration "twill.location.cache.dir" to enable jar 
caching and reuse
  - Currently only the launcher.jar, twill.jar and application jar will be 
cached and reuse when possible
  - Cache cleanup logic is also in place to remove files in cache directory 
that is no longer used by application
4. The ApplicationBundler is improved to allow more flexible usage

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chtyim/twill feature/twill-63

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/twill/pull/21.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21


commit 786f3c6e075c15d929b43e322f8a869963a95b81
Author: Terence Yim 
Date:   2016-12-07T01:05:11Z

(TWILL-63) Speed up application launch time

The general approach is better jar files management and to cache and reuse 
jar files created through
class dependency tracing. The changes are further broken down as follows:

1. Refactor jars generation
  - One jar containing the TwillLauncher (launcher.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing all twill classes (twill.jar), created through 
dependency tracing.
- This jar is the same for all applications.
  - One jar containing the application class, created through dependency 
tracing.
- This jar is generated based on the application being launched. It is 
reusable when launching the same app multiple times.
  - One jar containing user resources setup through TwillPreparer.
- This jar is not reused between apps.
  - One jar containing runtime config needed by Twill
- logback.xml, jvm opts, environment, classpaths, ... etc
2. Let YARN to expand jars when localizing to containers instead of 
expanding it programatically
  - This save time in jar expansion when multiple containers are running on 
the same host
3. Introduce a new configuration "twill.location.cache.dir" to enable jar 
caching and reuse
  - Currently only the launcher.jar, twill.jar and application jar will be 
cached and reuse when possible
  - Cache cleanup logic is also in place to remove files in cache directory 
that is no longer used by application
4. The ApplicationBundler is improved to allow more flexible usage




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Retrieving failure when app terminates abnormally

2017-01-05 Thread Yuliya Feldman
Hello there,

I am trying to use async APIs to start/stop Twill managed Yarn Application.

I am using onRunning() and onTerminated() APIs for this, but I don't see a
way of retrieving an error in case of failure

public void onTerminated(final Runnable runnable, Executor executor) {
  this.addListener(new ServiceListenerAdapter() {
public void failed(State from, Throwable failure) {
  runnable.run();
}

public void terminated(State from) {
  runnable.run();
}
  }, executor);
}


Is there is a way of retrieving "Throwable failure" ?

Or am I using wrong APIs?

Thanks


[jira] [Assigned] (TWILL-63) Speed up application launch time

2017-01-05 Thread Terence Yim (JIRA)

 [ 
https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terence Yim reassigned TWILL-63:


Assignee: Terence Yim  (was: Shankar Selvam)

> Speed up application launch time
> 
>
> Key: TWILL-63
> URL: https://issues.apache.org/jira/browse/TWILL-63
> Project: Apache Twill
>  Issue Type: Improvement
>  Components: yarn
>Affects Versions: 0.2.0-incubating
>Reporter: Terence Yim
>Assignee: Terence Yim
> Fix For: 0.10.0
>
>
> Currently when launching an application, two new jars are always created 
> locally, one for AM (appMaster.jar) and one for Container (container.jar) and 
> copied to HDFS before submitting the application. The jar files could 
> potentially be big and if it doesn't changed, it should require copying to 
> HDFS again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (TWILL-63) Speed up application launch time

2017-01-05 Thread Terence Yim (JIRA)

 [ 
https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terence Yim updated TWILL-63:
-
Fix Version/s: (was: 0.9.0)
   0.10.0

> Speed up application launch time
> 
>
> Key: TWILL-63
> URL: https://issues.apache.org/jira/browse/TWILL-63
> Project: Apache Twill
>  Issue Type: Improvement
>  Components: yarn
>Affects Versions: 0.2.0-incubating
>Reporter: Terence Yim
>Assignee: Terence Yim
> Fix For: 0.10.0
>
>
> Currently when launching an application, two new jars are always created 
> locally, one for AM (appMaster.jar) and one for Container (container.jar) and 
> copied to HDFS before submitting the application. The jar files could 
> potentially be big and if it doesn't changed, it should require copying to 
> HDFS again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Desired behavior at shutdown

2017-01-05 Thread Terence Yim
Hi Martin,

I do agree that the AM should only shutdown the Embedded Kafka server once all 
the controllers see all the logs. However, the difficulties is in how does the 
AM knows about it? The Twill controller is using simple Kafka API instead of 
the higher level one (as that one involves checkpointing to ZK, as we don't 
want running many twill apps put a heavy load on ZK). Do you have any 
suggestions how to do that?

Thanks,
Terence

Sent from my iPhone

> On Jan 5, 2017, at 2:42 PM, Martin Serrano  wrote:
> 
> Actually, after further investigation, I realize the server side has to be 
> dealt with because it is shutting down the Kafka broker before all the 
> messages are read from it.  I see that there is a 2 second delay for clients 
> to pull what they can first.  What would folks think about an algorithm that 
> checked the topic for unread messages and had a longer timeout (say 30s) as 
> long as there were messages to be received still?  Is there an issue that the 
> client may not be present on the other side and that the delay of shutting 
> down the AM would be undesirable?
> 
> -Martin
> 
>> On 01/05/2017 12:32 PM, Martin Serrano wrote:
>> 
>> All,
>> 
>> I'm encountering a situation on a fast machine where the Kafka log 
>> aggregation topic is not empty when the system shuts down. The scenario:
>> 
>>  *  log consumer consumes all messages
>>  * consumer sleeps (500ms) due to empty queue
>>  * containers exit, posting /final log messages/ about why
>>  * controller notices containers are down and terminates consumers.
>>  * consumer is interrupted from sleep and but has been canceled so it
>>does not get the rest of the messages.
>> 
>> This scenario can be really confusing during development because an error 
>> may be missed (as in my case) if it falls into the /final log messages/.  
>> Before I file a ticket and fix this, I wanted to get some feedback.  Looking 
>> at org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this 
>> behavior could be intentional given this log message (line 384):
>> 
>>LOG.debug("Unable to fetch messages on {}, kafka consumer service 
>> shutdown is in progress.", topicPart);
>> 
>> My opinion is that final messages logged by a container are likely to be 
>> critical in diagnosing errors and that twill should do whatever it can to 
>> forward them before shutting things down. If there is agreement on this I'll 
>> file a ticket and fix it.  My general approach would be to indicate to the 
>> consumer that it is in a shuttingDown state which it would use to break from 
>> the consume loop once the message set was empty.  If this makes sense would 
>> we need to support a timeout for the maximum amount of time to be in this 
>> state before punting on the rest of the messages?  My instinct is no, get 
>> them all, but given the way the code is set up now, perhaps there are good 
>> reasons to timeout.
>> 
>> Thanks,
>> 
>> Martin Serrano
>> 
> 


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802862#comment-15802862
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94874821
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

If one specified `-1L` or `-2L`, but no message is consumed in the method, 
then `-1L` or `-2L` still has to be returned


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94874821
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

If one specified `-1L` or `-2L`, but no message is consumed in the method, 
then `-1L` or `-2L` still has to be returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802857#comment-15802857
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94874547
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

or you change the code in a way that this statement is correct.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94874547
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

or you change the code in a way that this statement is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Desired behavior at shutdown

2017-01-05 Thread Martin Serrano
Actually, after further investigation, I realize the server side has to 
be dealt with because it is shutting down the Kafka broker before all 
the messages are read from it.  I see that there is a 2 second delay for 
clients to pull what they can first.  What would folks think about an 
algorithm that checked the topic for unread messages and had a longer 
timeout (say 30s) as long as there were messages to be received still?  
Is there an issue that the client may not be present on the other side 
and that the delay of shutting down the AM would be undesirable?


-Martin

On 01/05/2017 12:32 PM, Martin Serrano wrote:


All,

I'm encountering a situation on a fast machine where the Kafka log 
aggregation topic is not empty when the system shuts down. The scenario:


  *  log consumer consumes all messages
  * consumer sleeps (500ms) due to empty queue
  * containers exit, posting /final log messages/ about why
  * controller notices containers are down and terminates consumers.
  * consumer is interrupted from sleep and but has been canceled so it
does not get the rest of the messages.

This scenario can be really confusing during development because an 
error may be missed (as in my case) if it falls into the /final log 
messages/.  Before I file a ticket and fix this, I wanted to get some 
feedback.  Looking at 
org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems 
this behavior could be intentional given this log message (line 384):


LOG.debug("Unable to fetch messages on {}, kafka consumer 
service shutdown is in progress.", topicPart);


My opinion is that final messages logged by a container are likely to 
be critical in diagnosing errors and that twill should do whatever it 
can to forward them before shutting things down. If there is agreement 
on this I'll file a ticket and fix it.  My general approach would be 
to indicate to the consumer that it is in a shuttingDown state which 
it would use to break from the consume loop once the message set was 
empty.  If this makes sense would we need to support a timeout for the 
maximum amount of time to be in this state before punting on the rest 
of the messages?  My instinct is no, get them all, but given the way 
the code is set up now, perhaps there are good reasons to timeout.


Thanks,

Martin Serrano





[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802663#comment-15802663
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94861374
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

How about "The default offset to return as the offset to fetch next message 
if no message is consumed in this method"


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94861374
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

How about "The default offset to return as the offset to fetch next message 
if no message is consumed in this method"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802627#comment-15802627
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94858965
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Is this true? If someone specified `-1L` or `-2L` as the offset, would the 
`startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be 
negative) of the first message in the iterator?


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94858965
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Is this true? If someone specified `-1L` or `-2L` as the offset, would the 
`startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be 
negative) of the first message in the iterator?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802035#comment-15802035
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94691443
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the current message to be consumed.
--- End diff --

This is not exactly clear. The `startOffset` is the offset used to fetch 
the messages, right?


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback

2017-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802036#comment-15802036
 ] 

ASF GitHub Bot commented on TWILL-199:
--

Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94817038
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -189,6 +194,49 @@ public void finished() {
   }
 
   @Test
+  public void testKafkaClientSkipNext() throws Exception {
+String topic = "testClient";
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(
+  new KafkaConsumer.MessageCallback() {
+  @Override
+  public long onReceived(long startOffset, Iterator 
messages) {
+if (messages.hasNext()) {
+  offsetQueue.offer(startOffset);
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  return message.getNextOffset() + 1;
+}
+return startOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+// 15 messages should be in the queue since onReceived returns 
`message.getNextOffset() + 1` as next offset to read
+for (int i = 0; i < 30; i += 2) {
+  Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS));
+}
+Assert.assertEquals(0, offsetQueue.size());
--- End diff --

should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` 
instead.


> Get next offset and handle offset error in KafkaConsumer.MessageCallback
> 
>
> Key: TWILL-199
> URL: https://issues.apache.org/jira/browse/TWILL-199
> Project: Apache Twill
>  Issue Type: Improvement
>Reporter: Chengfeng Mao
>
> The method {{void onReceived(Iterator messages)}} in 
> {{KafkaConsumer.MessageCallback}} can be more flexible with the change to 
> {{Long onReceived(Iterator messages)}} so that it can provide 
> additional functionalities:
> 1. To return the next offset to be fetched
> 2. To handle offset non-existence or offset mismatch error and take action on 
> the error
> This method will return null for backward compatibility when it doesn't need 
> to provide the next offset.
> In concrete implementation,  a class of a new interface 
> {{KafkaOffsetProvider}} can be added as a member in 
> {{KafkaConsumer.MessageCallback}} to perform the offset error handling and 
> provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to 
> provide the following functionalities:
> 1. To fetch earliest/latest offset in Kafka
> 2. To find the offset of a message with timestamp equal to the given 
> timestamp in Kafka
> For backward compatibility, if {{KafkaOffsetProvider}} instance is not 
> provided, its default value will be null and none of its methods will be 
> called.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94817038
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -189,6 +194,49 @@ public void finished() {
   }
 
   @Test
+  public void testKafkaClientSkipNext() throws Exception {
+String topic = "testClient";
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(
+  new KafkaConsumer.MessageCallback() {
+  @Override
+  public long onReceived(long startOffset, Iterator 
messages) {
+if (messages.hasNext()) {
+  offsetQueue.offer(startOffset);
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  return message.getNextOffset() + 1;
+}
+return startOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+// 15 messages should be in the queue since onReceived returns 
`message.getNextOffset() + 1` as next offset to read
+for (int i = 0; i < 30; i += 2) {
+  Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS));
+}
+Assert.assertEquals(0, offsetQueue.size());
--- End diff --

should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` 
instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94691443
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the current message to be consumed.
--- End diff --

This is not exactly clear. The `startOffset` is the offset used to fetch 
the messages, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Desired behavior at shutdown

2017-01-05 Thread Martin Serrano

All,

I'm encountering a situation on a fast machine where the Kafka log 
aggregation topic is not empty when the system shuts down. The scenario:


 *   log consumer consumes all messages
 * consumer sleeps (500ms) due to empty queue
 * containers exit, posting /final log messages/ about why
 * controller notices containers are down and terminates consumers.
 * consumer is interrupted from sleep and but has been canceled so it
   does not get the rest of the messages.

This scenario can be really confusing during development because an 
error may be missed (as in my case) if it falls into the /final log 
messages/.  Before I file a ticket and fix this, I wanted to get some 
feedback.  Looking at 
org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this 
behavior could be intentional given this log message (line 384):


LOG.debug("Unable to fetch messages on {}, kafka consumer 
service shutdown is in progress.", topicPart);


My opinion is that final messages logged by a container are likely to be 
critical in diagnosing errors and that twill should do whatever it can 
to forward them before shutting things down. If there is agreement on 
this I'll file a ticket and fix it.  My general approach would be to 
indicate to the consumer that it is in a shuttingDown state which it 
would use to break from the consume loop once the message set was 
empty.  If this makes sense would we need to support a timeout for the 
maximum amount of time to be in this state before punting on the rest of 
the messages? My instinct is no, get them all, but given the way the 
code is set up now, perhaps there are good reasons to timeout.


Thanks,

Martin Serrano