[jira] [Commented] (TWILL-63) Speed up application launch time
[ https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15803888#comment-15803888 ] ASF GitHub Bot commented on TWILL-63: - GitHub user chtyim opened a pull request: https://github.com/apache/twill/pull/21 (TWILL-63) Speed up application launch time The general approach is better jar files management and to cache and reuse jar files created through class dependency tracing. The changes are further broken down as follows: 1. Refactor jars generation - One jar containing the TwillLauncher (launcher.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing all twill classes (twill.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing the application class, created through dependency tracing. - This jar is generated based on the application being launched. It is reusable when launching the same app multiple times. - One jar containing user resources setup through TwillPreparer. - This jar is not reused between apps. - One jar containing runtime config needed by Twill - logback.xml, jvm opts, environment, classpaths, ... etc 2. Let YARN to expand jars when localizing to containers instead of expanding it programatically - This save time in jar expansion when multiple containers are running on the same host 3. Introduce a new configuration "twill.location.cache.dir" to enable jar caching and reuse - Currently only the launcher.jar, twill.jar and application jar will be cached and reuse when possible - Cache cleanup logic is also in place to remove files in cache directory that is no longer used by application 4. The ApplicationBundler is improved to allow more flexible usage You can merge this pull request into a Git repository by running: $ git pull https://github.com/chtyim/twill feature/twill-63 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/twill/pull/21.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21 commit 786f3c6e075c15d929b43e322f8a869963a95b81 Author: Terence Yim Date: 2016-12-07T01:05:11Z (TWILL-63) Speed up application launch time The general approach is better jar files management and to cache and reuse jar files created through class dependency tracing. The changes are further broken down as follows: 1. Refactor jars generation - One jar containing the TwillLauncher (launcher.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing all twill classes (twill.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing the application class, created through dependency tracing. - This jar is generated based on the application being launched. It is reusable when launching the same app multiple times. - One jar containing user resources setup through TwillPreparer. - This jar is not reused between apps. - One jar containing runtime config needed by Twill - logback.xml, jvm opts, environment, classpaths, ... etc 2. Let YARN to expand jars when localizing to containers instead of expanding it programatically - This save time in jar expansion when multiple containers are running on the same host 3. Introduce a new configuration "twill.location.cache.dir" to enable jar caching and reuse - Currently only the launcher.jar, twill.jar and application jar will be cached and reuse when possible - Cache cleanup logic is also in place to remove files in cache directory that is no longer used by application 4. The ApplicationBundler is improved to allow more flexible usage > Speed up application launch time > > > Key: TWILL-63 > URL: https://issues.apache.org/jira/browse/TWILL-63 > Project: Apache Twill > Issue Type: Improvement > Components: yarn >Affects Versions: 0.2.0-incubating >Reporter: Terence Yim >Assignee: Terence Yim > Fix For: 0.10.0 > > > Currently when launching an application, two new jars are always created > locally, one for AM (appMaster.jar) and one for Container (container.jar) and > copied to HDFS before submitting the application. The jar files could > potentially be big and if it doesn't changed, it should require copying to > HDFS again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] twill pull request #21: (TWILL-63) Speed up application launch time
GitHub user chtyim opened a pull request: https://github.com/apache/twill/pull/21 (TWILL-63) Speed up application launch time The general approach is better jar files management and to cache and reuse jar files created through class dependency tracing. The changes are further broken down as follows: 1. Refactor jars generation - One jar containing the TwillLauncher (launcher.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing all twill classes (twill.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing the application class, created through dependency tracing. - This jar is generated based on the application being launched. It is reusable when launching the same app multiple times. - One jar containing user resources setup through TwillPreparer. - This jar is not reused between apps. - One jar containing runtime config needed by Twill - logback.xml, jvm opts, environment, classpaths, ... etc 2. Let YARN to expand jars when localizing to containers instead of expanding it programatically - This save time in jar expansion when multiple containers are running on the same host 3. Introduce a new configuration "twill.location.cache.dir" to enable jar caching and reuse - Currently only the launcher.jar, twill.jar and application jar will be cached and reuse when possible - Cache cleanup logic is also in place to remove files in cache directory that is no longer used by application 4. The ApplicationBundler is improved to allow more flexible usage You can merge this pull request into a Git repository by running: $ git pull https://github.com/chtyim/twill feature/twill-63 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/twill/pull/21.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21 commit 786f3c6e075c15d929b43e322f8a869963a95b81 Author: Terence Yim Date: 2016-12-07T01:05:11Z (TWILL-63) Speed up application launch time The general approach is better jar files management and to cache and reuse jar files created through class dependency tracing. The changes are further broken down as follows: 1. Refactor jars generation - One jar containing the TwillLauncher (launcher.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing all twill classes (twill.jar), created through dependency tracing. - This jar is the same for all applications. - One jar containing the application class, created through dependency tracing. - This jar is generated based on the application being launched. It is reusable when launching the same app multiple times. - One jar containing user resources setup through TwillPreparer. - This jar is not reused between apps. - One jar containing runtime config needed by Twill - logback.xml, jvm opts, environment, classpaths, ... etc 2. Let YARN to expand jars when localizing to containers instead of expanding it programatically - This save time in jar expansion when multiple containers are running on the same host 3. Introduce a new configuration "twill.location.cache.dir" to enable jar caching and reuse - Currently only the launcher.jar, twill.jar and application jar will be cached and reuse when possible - Cache cleanup logic is also in place to remove files in cache directory that is no longer used by application 4. The ApplicationBundler is improved to allow more flexible usage --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Retrieving failure when app terminates abnormally
Hello there, I am trying to use async APIs to start/stop Twill managed Yarn Application. I am using onRunning() and onTerminated() APIs for this, but I don't see a way of retrieving an error in case of failure public void onTerminated(final Runnable runnable, Executor executor) { this.addListener(new ServiceListenerAdapter() { public void failed(State from, Throwable failure) { runnable.run(); } public void terminated(State from) { runnable.run(); } }, executor); } Is there is a way of retrieving "Throwable failure" ? Or am I using wrong APIs? Thanks
[jira] [Assigned] (TWILL-63) Speed up application launch time
[ https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terence Yim reassigned TWILL-63: Assignee: Terence Yim (was: Shankar Selvam) > Speed up application launch time > > > Key: TWILL-63 > URL: https://issues.apache.org/jira/browse/TWILL-63 > Project: Apache Twill > Issue Type: Improvement > Components: yarn >Affects Versions: 0.2.0-incubating >Reporter: Terence Yim >Assignee: Terence Yim > Fix For: 0.10.0 > > > Currently when launching an application, two new jars are always created > locally, one for AM (appMaster.jar) and one for Container (container.jar) and > copied to HDFS before submitting the application. The jar files could > potentially be big and if it doesn't changed, it should require copying to > HDFS again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (TWILL-63) Speed up application launch time
[ https://issues.apache.org/jira/browse/TWILL-63?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terence Yim updated TWILL-63: - Fix Version/s: (was: 0.9.0) 0.10.0 > Speed up application launch time > > > Key: TWILL-63 > URL: https://issues.apache.org/jira/browse/TWILL-63 > Project: Apache Twill > Issue Type: Improvement > Components: yarn >Affects Versions: 0.2.0-incubating >Reporter: Terence Yim >Assignee: Terence Yim > Fix For: 0.10.0 > > > Currently when launching an application, two new jars are always created > locally, one for AM (appMaster.jar) and one for Container (container.jar) and > copied to HDFS before submitting the application. The jar files could > potentially be big and if it doesn't changed, it should require copying to > HDFS again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Desired behavior at shutdown
Hi Martin, I do agree that the AM should only shutdown the Embedded Kafka server once all the controllers see all the logs. However, the difficulties is in how does the AM knows about it? The Twill controller is using simple Kafka API instead of the higher level one (as that one involves checkpointing to ZK, as we don't want running many twill apps put a heavy load on ZK). Do you have any suggestions how to do that? Thanks, Terence Sent from my iPhone > On Jan 5, 2017, at 2:42 PM, Martin Serrano wrote: > > Actually, after further investigation, I realize the server side has to be > dealt with because it is shutting down the Kafka broker before all the > messages are read from it. I see that there is a 2 second delay for clients > to pull what they can first. What would folks think about an algorithm that > checked the topic for unread messages and had a longer timeout (say 30s) as > long as there were messages to be received still? Is there an issue that the > client may not be present on the other side and that the delay of shutting > down the AM would be undesirable? > > -Martin > >> On 01/05/2017 12:32 PM, Martin Serrano wrote: >> >> All, >> >> I'm encountering a situation on a fast machine where the Kafka log >> aggregation topic is not empty when the system shuts down. The scenario: >> >> * log consumer consumes all messages >> * consumer sleeps (500ms) due to empty queue >> * containers exit, posting /final log messages/ about why >> * controller notices containers are down and terminates consumers. >> * consumer is interrupted from sleep and but has been canceled so it >>does not get the rest of the messages. >> >> This scenario can be really confusing during development because an error >> may be missed (as in my case) if it falls into the /final log messages/. >> Before I file a ticket and fix this, I wanted to get some feedback. Looking >> at org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this >> behavior could be intentional given this log message (line 384): >> >>LOG.debug("Unable to fetch messages on {}, kafka consumer service >> shutdown is in progress.", topicPart); >> >> My opinion is that final messages logged by a container are likely to be >> critical in diagnosing errors and that twill should do whatever it can to >> forward them before shutting things down. If there is agreement on this I'll >> file a ticket and fix it. My general approach would be to indicate to the >> consumer that it is in a shuttingDown state which it would use to break from >> the consume loop once the message set was empty. If this makes sense would >> we need to support a timeout for the maximum amount of time to be in this >> state before punting on the rest of the messages? My instinct is no, get >> them all, but given the way the code is set up now, perhaps there are good >> reasons to timeout. >> >> Thanks, >> >> Martin Serrano >> >
[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback
[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802862#comment-15802862 ] ASF GitHub Bot commented on TWILL-199: -- Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94874821 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- If one specified `-1L` or `-2L`, but no message is consumed in the method, then `-1L` or `-2L` still has to be returned > Get next offset and handle offset error in KafkaConsumer.MessageCallback > > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement >Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94874821 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- If one specified `-1L` or `-2L`, but no message is consumed in the method, then `-1L` or `-2L` still has to be returned --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback
[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802857#comment-15802857 ] ASF GitHub Bot commented on TWILL-199: -- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94874547 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- or you change the code in a way that this statement is correct. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement >Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94874547 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- or you change the code in a way that this statement is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Desired behavior at shutdown
Actually, after further investigation, I realize the server side has to be dealt with because it is shutting down the Kafka broker before all the messages are read from it. I see that there is a 2 second delay for clients to pull what they can first. What would folks think about an algorithm that checked the topic for unread messages and had a longer timeout (say 30s) as long as there were messages to be received still? Is there an issue that the client may not be present on the other side and that the delay of shutting down the AM would be undesirable? -Martin On 01/05/2017 12:32 PM, Martin Serrano wrote: All, I'm encountering a situation on a fast machine where the Kafka log aggregation topic is not empty when the system shuts down. The scenario: * log consumer consumes all messages * consumer sleeps (500ms) due to empty queue * containers exit, posting /final log messages/ about why * controller notices containers are down and terminates consumers. * consumer is interrupted from sleep and but has been canceled so it does not get the rest of the messages. This scenario can be really confusing during development because an error may be missed (as in my case) if it falls into the /final log messages/. Before I file a ticket and fix this, I wanted to get some feedback. Looking at org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this behavior could be intentional given this log message (line 384): LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart); My opinion is that final messages logged by a container are likely to be critical in diagnosing errors and that twill should do whatever it can to forward them before shutting things down. If there is agreement on this I'll file a ticket and fix it. My general approach would be to indicate to the consumer that it is in a shuttingDown state which it would use to break from the consume loop once the message set was empty. If this makes sense would we need to support a timeout for the maximum amount of time to be in this state before punting on the rest of the messages? My instinct is no, get them all, but given the way the code is set up now, perhaps there are good reasons to timeout. Thanks, Martin Serrano
[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback
[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802663#comment-15802663 ] ASF GitHub Bot commented on TWILL-199: -- Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94861374 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- How about "The default offset to return as the offset to fetch next message if no message is consumed in this method" > Get next offset and handle offset error in KafkaConsumer.MessageCallback > > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement >Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94861374 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- How about "The default offset to return as the offset to fetch next message if no message is consumed in this method" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback
[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802627#comment-15802627 ] ASF GitHub Bot commented on TWILL-199: -- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94858965 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Is this true? If someone specified `-1L` or `-2L` as the offset, would the `startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be negative) of the first message in the iterator? > Get next offset and handle offset error in KafkaConsumer.MessageCallback > > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement >Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94858965 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Is this true? If someone specified `-1L` or `-2L` as the offset, would the `startOffset` here be `-1L` or `-2L` or be the actual offset (which won't be negative) of the first message in the iterator? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback
[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802035#comment-15802035 ] ASF GitHub Bot commented on TWILL-199: -- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94691443 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the current message to be consumed. --- End diff -- This is not exactly clear. The `startOffset` is the offset used to fetch the messages, right? > Get next offset and handle offset error in KafkaConsumer.MessageCallback > > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement >Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (TWILL-199) Get next offset and handle offset error in KafkaConsumer.MessageCallback
[ https://issues.apache.org/jira/browse/TWILL-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15802036#comment-15802036 ] ASF GitHub Bot commented on TWILL-199: -- Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94817038 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -189,6 +194,49 @@ public void finished() { } @Test + public void testKafkaClientSkipNext() throws Exception { +String topic = "testClient"; +// Publish 30 messages with indecies the same as offsets within the range 0 - 29 +Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); +t1.start(); +t1.join(); +Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); +t2.start(); +t2.join(); +Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); +t3.start(); +t3.join(); + +final CountDownLatch stopLatch = new CountDownLatch(1); +final BlockingQueue offsetQueue = new LinkedBlockingQueue<>(); +Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume( + new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(long startOffset, Iterator messages) { +if (messages.hasNext()) { + offsetQueue.offer(startOffset); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + return message.getNextOffset() + 1; +} +return startOffset; + } + + @Override + public void finished() { +stopLatch.countDown(); + } +}); +// 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read +for (int i = 0; i < 30; i += 2) { + Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS)); +} +Assert.assertEquals(0, offsetQueue.size()); --- End diff -- should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead. > Get next offset and handle offset error in KafkaConsumer.MessageCallback > > > Key: TWILL-199 > URL: https://issues.apache.org/jira/browse/TWILL-199 > Project: Apache Twill > Issue Type: Improvement >Reporter: Chengfeng Mao > > The method {{void onReceived(Iterator messages)}} in > {{KafkaConsumer.MessageCallback}} can be more flexible with the change to > {{Long onReceived(Iterator messages)}} so that it can provide > additional functionalities: > 1. To return the next offset to be fetched > 2. To handle offset non-existence or offset mismatch error and take action on > the error > This method will return null for backward compatibility when it doesn't need > to provide the next offset. > In concrete implementation, a class of a new interface > {{KafkaOffsetProvider}} can be added as a member in > {{KafkaConsumer.MessageCallback}} to perform the offset error handling and > provide the next offset. Besides, {{KafkaOffsetProvider}} also has methods to > provide the following functionalities: > 1. To fetch earliest/latest offset in Kafka > 2. To find the offset of a message with timestamp equal to the given > timestamp in Kafka > For backward compatibility, if {{KafkaOffsetProvider}} instance is not > provided, its default value will be null and none of its methods will be > called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94817038 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -189,6 +194,49 @@ public void finished() { } @Test + public void testKafkaClientSkipNext() throws Exception { +String topic = "testClient"; +// Publish 30 messages with indecies the same as offsets within the range 0 - 29 +Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); +t1.start(); +t1.join(); +Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); +t2.start(); +t2.join(); +Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); +t3.start(); +t3.join(); + +final CountDownLatch stopLatch = new CountDownLatch(1); +final BlockingQueue offsetQueue = new LinkedBlockingQueue<>(); +Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume( + new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(long startOffset, Iterator messages) { +if (messages.hasNext()) { + offsetQueue.offer(startOffset); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + return message.getNextOffset() + 1; +} +return startOffset; + } + + @Override + public void finished() { +stopLatch.countDown(); + } +}); +// 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read +for (int i = 0; i < 30; i += 2) { + Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS)); +} +Assert.assertEquals(0, offsetQueue.size()); --- End diff -- should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94691443 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the current message to be consumed. --- End diff -- This is not exactly clear. The `startOffset` is the offset used to fetch the messages, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Desired behavior at shutdown
All, I'm encountering a situation on a fast machine where the Kafka log aggregation topic is not empty when the system shuts down. The scenario: * log consumer consumes all messages * consumer sleeps (500ms) due to empty queue * containers exit, posting /final log messages/ about why * controller notices containers are down and terminates consumers. * consumer is interrupted from sleep and but has been canceled so it does not get the rest of the messages. This scenario can be really confusing during development because an error may be missed (as in my case) if it falls into the /final log messages/. Before I file a ticket and fix this, I wanted to get some feedback. Looking at org.apache.twill.internal.kafka.client.SimpleKafkaConsumer it seems this behavior could be intentional given this log message (line 384): LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart); My opinion is that final messages logged by a container are likely to be critical in diagnosing errors and that twill should do whatever it can to forward them before shutting things down. If there is agreement on this I'll file a ticket and fix it. My general approach would be to indicate to the consumer that it is in a shuttingDown state which it would use to break from the consume loop once the message set was empty. If this makes sense would we need to support a timeout for the maximum amount of time to be in this state before punting on the rest of the messages? My instinct is no, get them all, but given the way the code is set up now, perhaps there are good reasons to timeout. Thanks, Martin Serrano