[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:57 AM: The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of {{deserialize(bytes, collector)}} will need to make sure that all outputs are added to the collector before returning from the method. Also note that for the underlying implementation, it might differ between 0.8 and 0.9+, mainly due to the different threading models for how partitions are consumed. For 0.8 I think we need to have a separate collector for each subscribed Kafka partition, while in 0.9, we can have a single collector for the whole subtask. was (Author: tzulitai): The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of {{deserialize(bytes, collector)}} will need to make sure that all outputs are added to the collector before returning from the method. Also note that for the underlying implementation I think we should have a separate collector for each subscribed Kafka partition. A collector cannot be shared for multiple partitions. > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5702) Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised
[ https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-5702: -- Assignee: Tzu-Li (Gordon) Tai > Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is > compromised > - > > Key: FLINK-5702 > URL: https://issues.apache.org/jira/browse/FLINK-5702 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > The documentation for FlinkKafkaProducer does not have any information about > the {{setLogFailuresOnly}}. It should emphasize that if users choose to only > log failures instead of failing the sink, at-least-once can not be guaranteed > . -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:47 AM: The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of {{deserialize(bytes, collector)}} will need to make sure that all outputs are added to the collector before returning from the method. Also note that for the underlying implementation I think we should have a separate collector for each subscribed Kafka partition. A collector cannot be shared for multiple partitions. was (Author: tzulitai): The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of `deserialize()` will need to make sure that all outputs are added to the collector before returning from the method. Also note that for the underlying implementation I think we should have a separate collector for each subscribed Kafka partition. A collector cannot be shared for multiple partitions. > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:47 AM: The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of `deserialize()` will need to make sure that all outputs are added to the collector before returning from the method. Also note that for the underlying implementation I think we should have a separate collector for each subscribed Kafka partition. A collector cannot be shared for multiple partitions. was (Author: tzulitai): The collector can be made thread-safe yes, depending on the implementation. Also note that for the underlying implementation I think we should have a separate collector for each subscribed Kafka partition. A collector cannot be shared for multiple partitions. > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488 ] Tzu-Li (Gordon) Tai commented on FLINK-5583: The collector can be made thread-safe yes, depending on the implementation. Also note that for the underlying implementation I think we should have a separate collector for each subscribed Kafka partition. A collector cannot be shared for multiple partitions. > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3282: [FLINK-5702] [doc] At-least-once configuration inf...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3282 [FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer Adds information about the `setLogFailureOnly` and `setFlushOnCheckpoint` methods with regards to at-least-once guarantees of the producer, and warns that at-least-once is compromised if configured inappropriately. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5702 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3282 commit b90a5db99802d8d483ad064f21f8a3fbe42c4076 Author: Tzu-Li (Gordon) TaiDate: 2017-02-07T06:32:28Z [FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5702) Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised
[ https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855483#comment-15855483 ] ASF GitHub Bot commented on FLINK-5702: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3282 [FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer Adds information about the `setLogFailureOnly` and `setFlushOnCheckpoint` methods with regards to at-least-once guarantees of the producer, and warns that at-least-once is compromised if configured inappropriately. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5702 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3282 commit b90a5db99802d8d483ad064f21f8a3fbe42c4076 Author: Tzu-Li (Gordon) TaiDate: 2017-02-07T06:32:28Z [FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer > Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is > compromised > - > > Key: FLINK-5702 > URL: https://issues.apache.org/jira/browse/FLINK-5702 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kafka Connector >Reporter: Tzu-Li (Gordon) Tai > > The documentation for FlinkKafkaProducer does not have any information about > the {{setLogFailuresOnly}}. It should emphasize that if users choose to only > log failures instead of failing the sink, at-least-once can not be guaranteed > . -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5728) FlinkKafkaProducer should flush on checkpoint by default
Tzu-Li (Gordon) Tai created FLINK-5728: -- Summary: FlinkKafkaProducer should flush on checkpoint by default Key: FLINK-5728 URL: https://issues.apache.org/jira/browse/FLINK-5728 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Tzu-Li (Gordon) Tai As discussed in FLINK-5702, it might be a good idea to let the FlinkKafkaProducer flush on checkpoints by default. Currently, it is disabled by default. It's a very simple change, but we should think about whether or not we want to break user behaviour, or have proper usage migration. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855478#comment-15855478 ] Haohui Mai commented on FLINK-5583: --- The interface looks good to me. It looks like that you want to have the deserialization out of the lock. Is it okay to assume that collector is thread-safe? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai commented on FLINK-5583: Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: ``` public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } ``` Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:21 AM: Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method; some users might just want to drop the corrupt record). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32, and not in-between. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? was (Author: tzulitai): Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32, and not in-between. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:20 AM: Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32, and not in-between. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? was (Author: tzulitai): Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:17 AM: Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? was (Author: tzulitai): Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: ``` public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } ``` Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:19 AM: Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? was (Author: tzulitai): Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:18 AM: Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? was (Author: tzulitai): Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855424#comment-15855424 ] Haohui Mai commented on FLINK-5583: --- The pseudo-code looks like the following: {code} void invoke() { try { Tuple2kv = deserialization.deserialze(bytes); } catch (Throwable v) { // Write the corrupted message to external sources (e.g., Kafka, HDFS) deserialization.writeToExternalSources(bytes); } } void checkpoint() { ... deserialization.flush(); } {code} Strictly speaking the {{DeserialzationSchema}} does not need to have persistent state (which IMO should be the right thing to do). However, it does require proper synchronizations when the consumer checkpoints. Does it make sense to you [~tzulitai]? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3274: [FLINK-5723][UI]Use Used instead of Initial to make taskm...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3274 @zentol Thanks for review. I've changed .jade file, but it looks like CI is not up properly :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5723) Use "Used" instead of "Initial" to make taskmanager tag more readable
[ https://issues.apache.org/jira/browse/FLINK-5723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855412#comment-15855412 ] ASF GitHub Bot commented on FLINK-5723: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3274 @zentol Thanks for review. I've changed .jade file, but it looks like CI is not up properly :( > Use "Used" instead of "Initial" to make taskmanager tag more readable > - > > Key: FLINK-5723 > URL: https://issues.apache.org/jira/browse/FLINK-5723 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Tao Wang >Priority: Trivial > > Now in JobManager web fronted, the used memory of task managers is presented > as "Initial" in table header, which actually means "memory used", from codes. > I'd like change it to be more readable, even it is trivial one. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3281: [FLINK-5727] [table] Unify some API of batch and s...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3281 [FLINK-5727] [table] Unify some API of batch and stream TableEnvironment This PR contains 3 changes: 1. move `sql` to base class 2. unify `scan` and `ingest` to `scan`, and move to base class 3. move `registerTableSource` to base class and change the parameter type from specific `BatchTableSource` or `StreamTableSource` to their base class: `TableSource` You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5727 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3281 commit 8303b0c1c6320713daf212c6da4d72499c459e42 Author: Kurt YoungDate: 2017-02-07T06:15:02Z [FLINK-5727] [table] Unify some API of batch and stream TableEnvironment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5727) Unify some API of batch and stream TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855407#comment-15855407 ] ASF GitHub Bot commented on FLINK-5727: --- GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3281 [FLINK-5727] [table] Unify some API of batch and stream TableEnvironment This PR contains 3 changes: 1. move `sql` to base class 2. unify `scan` and `ingest` to `scan`, and move to base class 3. move `registerTableSource` to base class and change the parameter type from specific `BatchTableSource` or `StreamTableSource` to their base class: `TableSource` You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5727 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3281 commit 8303b0c1c6320713daf212c6da4d72499c459e42 Author: Kurt YoungDate: 2017-02-07T06:15:02Z [FLINK-5727] [table] Unify some API of batch and stream TableEnvironment > Unify some API of batch and stream TableEnvironment > --- > > Key: FLINK-5727 > URL: https://issues.apache.org/jira/browse/FLINK-5727 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > > Some API's implementation of current {{BatchTableEnvironment}} and > {{StreamTableEnvironment}} is identical, like {{sql}}. And other API like > {{registerTableSource}} and {{scan}} can be unified to the base class. (we > can change {{ingest}} from {{StreamTableEnvironment}} to {{scan}} also) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...
Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 Agree. If we have more than one distinct agg with groupings, do the partition first and reuse the subsets would improve the performance. Could we merge this PR first and create another JIRA to track the grouping cases? We need a workaround to support distinct aggs ASAP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855397#comment-15855397 ] ASF GitHub Bot commented on FLINK-3475: --- Github user docete commented on the issue: https://github.com/apache/flink/pull/3111 Agree. If we have more than one distinct agg with groupings, do the partition first and reuse the subsets would improve the performance. Could we merge this PR first and create another JIRA to track the grouping cases? We need a workaround to support distinct aggs ASAP. > DISTINCT aggregate function support for SQL queries > --- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Chengxiang Li >Assignee: Zhenghua Gao > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855383#comment-15855383 ] ASF GitHub Bot commented on FLINK-2168: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske - A gentle reminder !!! > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/3149 @fhueske - A gentle reminder !!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5727) Unify some API of batch and stream TableEnvironment
Kurt Young created FLINK-5727: - Summary: Unify some API of batch and stream TableEnvironment Key: FLINK-5727 URL: https://issues.apache.org/jira/browse/FLINK-5727 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Kurt Young Assignee: Kurt Young Some API's implementation of current {{BatchTableEnvironment}} and {{StreamTableEnvironment}} is identical, like {{sql}}. And other API like {{registerTableSource}} and {{scan}} can be unified to the base class. (we can change {{ingest}} from {{StreamTableEnvironment}} to {{scan}} also) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5702) Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised
[ https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855325#comment-15855325 ] Tzu-Li (Gordon) Tai commented on FLINK-5702: [~StephanEwen] Good point about changing the default. In this case we might need to think about migrating the {{FlinkKafkaProducerBase}} also though, as changing the default is a user-behaviour breaking change. I'll open a separate JIRA for this. > Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is > compromised > - > > Key: FLINK-5702 > URL: https://issues.apache.org/jira/browse/FLINK-5702 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kafka Connector >Reporter: Tzu-Li (Gordon) Tai > > The documentation for FlinkKafkaProducer does not have any information about > the {{setLogFailuresOnly}}. It should emphasize that if users choose to only > log failures instead of failing the sink, at-least-once can not be guaranteed > . -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5704) Deprecate FlinkKafkaConsumer constructors in favor of improvements to decoupling from Kafka offset committing
[ https://issues.apache.org/jira/browse/FLINK-5704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855319#comment-15855319 ] Tzu-Li (Gordon) Tai commented on FLINK-5704: FLINK-3679 and FLINK-5583 would probably change the {{DeserializationSchema}} interface. We should include that as part of the Kafka consumer usage migration. > Deprecate FlinkKafkaConsumer constructors in favor of improvements to > decoupling from Kafka offset committing > - > > Key: FLINK-5704 > URL: https://issues.apache.org/jira/browse/FLINK-5704 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > With FLINK-3398 and FLINK-4280, the {{FlinkKafkaConsumer}} will be able to > completely operate independently of committed offsets in Kafka. > I.e., > (1) *Starting position*: when starting, the consumer can choose to not use > any committed offsets in Kafka as the starting position > (2) *Committing offsets back to Kafka*: the consumer can completely opt-out > of committing offsets back to Kafka > However, our current default behaviour for (1) is to respect committed > offsets, and (2) is to always have offset committing. Users still have to > call the respective setter configuration methods to change this. > I think we should deprecate the current constructors in favor of new ones > with default behaviours (1) start from the latest record, without respecting > Kafka offsets, and (2) don't commit offsets. > With this change, users explicitly call the config methods of FLINK-3398 and > FLINK-4280 to *enable* respecting committed offsets for Kafka, instead of > _disabling_ it. They would want to / need to enable it, only when perhaps to > migrate from a non-Flink consuming application, or they wish to expose the > internal checkpointed offsets to measure consumer lag using Kafka toolings. > The main advantage for this change is that the API of {{FlinkKafkaConsumer}} > can speak for itself that it does not depend on committed offsets in Kafka > (this is a misconception that users frequently have), and that exactly-once > depends solely on offsets checkpointed internally using Flink's checkpointing > mechanics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855313#comment-15855313 ] Tzu-Li (Gordon) Tai commented on FLINK-5583: Regarding breaking user code when changing to a flatMap-like {{DeserializationSchema}}: I would actually prefer to have a new separate interface instead of breaking the original one. We could make use of FLINK-5704 to migrate to the new deserialization interface. > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5726) Add the RocketMQ plugin for the Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Meng updated FLINK-5726: Summary: Add the RocketMQ plugin for the Apache Flink (was: Add the RocketMQ plugin for the Apache Spark) > Add the RocketMQ plugin for the Apache Flink > > > Key: FLINK-5726 > URL: https://issues.apache.org/jira/browse/FLINK-5726 > Project: Flink > Issue Type: Task >Reporter: Longda Feng >Priority: Minor > > Apache RocketMQ® is an open source distributed messaging and streaming data > platform. It has been used in a lot of companies. Please refer to > http://rocketmq.incubator.apache.org/ for more details. > Since the Apache RocketMq 4.0 will be released in the next few days, we can > start the job of adding the RocketMq plugin for the Apache Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5726) Add the RocketMQ plugin for the Apache Spark
[ https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Longda Feng updated FLINK-5726: --- External issue ID: ROCKETMQ-82 > Add the RocketMQ plugin for the Apache Spark > > > Key: FLINK-5726 > URL: https://issues.apache.org/jira/browse/FLINK-5726 > Project: Flink > Issue Type: Task >Reporter: Longda Feng >Priority: Minor > > Apache RocketMQ® is an open source distributed messaging and streaming data > platform. It has been used in a lot of companies. Please refer to > http://rocketmq.incubator.apache.org/ for more details. > Since the Apache RocketMq 4.0 will be released in the next few days, we can > start the job of adding the RocketMq plugin for the Apache Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5726) Add the RocketMQ plugin for the Apache Spark
Longda Feng created FLINK-5726: -- Summary: Add the RocketMQ plugin for the Apache Spark Key: FLINK-5726 URL: https://issues.apache.org/jira/browse/FLINK-5726 Project: Flink Issue Type: Task Reporter: Longda Feng Priority: Minor Apache RocketMQ® is an open source distributed messaging and streaming data platform. It has been used in a lot of companies. Please refer to http://rocketmq.incubator.apache.org/ for more details. Since the Apache RocketMq 4.0 will be released in the next few days, we can start the job of adding the RocketMq plugin for the Apache Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855251#comment-15855251 ] Tzu-Li (Gordon) Tai commented on FLINK-5583: Hi [~wheat9], I'm not quite sure why your case requires the {{DeserializationSchema}} to be stateful. Could you elaborate a bit more? What is the state of the {{DeserializationSchema}} you have in mind? > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855241#comment-15855241 ] ASF GitHub Bot commented on FLINK-4354: --- Github user wangzhijiang999 commented on the issue: https://github.com/apache/flink/pull/2770 @tillrohrmann , thank you for review! Actually this PR is out-of-date based on flip-6, and my following modifications do not update this PR. This PR should be re-submitted based on master and I forgot to close it. Some above suggestions have already been fixed in the current implementation, and I want to merge this part in #3151 in order to be reviewed easily. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2770: [FLINK-4354]Implement TaskManager side of heartbeat from ...
Github user wangzhijiang999 commented on the issue: https://github.com/apache/flink/pull/2770 @tillrohrmann , thank you for review! Actually this PR is out-of-date based on flip-6, and my following modifications do not update this PR. This PR should be re-submitted based on master and I forgot to close it. Some above suggestions have already been fixed in the current implementation, and I want to merge this part in #3151 in order to be reviewed easily. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855222#comment-15855222 ] ASF GitHub Bot commented on FLINK-4354: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737255 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -99,6 +101,13 @@ public TaskManagerRunner( // Initialize the TM metrics TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); + HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl( + taskManagerConfiguration.getTimeout().toMilliseconds(), + resourceID, + executor, + Executors.newSingleThreadScheduledExecutor(), --- End diff -- It is already added in new modifications > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855218#comment-15855218 ] ASF GitHub Bot commented on FLINK-4354: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737204 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1099,4 +1121,23 @@ public void run() { }); } } + + private final class ResourceManagerHeartbeatListener implements HeartbeatListener { + + ResourceManagerHeartbeatListener() { + } + + @Override + public void notifyHeartbeatTimeout(ResourceID resourceID) { + } --- End diff -- It will be added in the new modifications > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737255 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -99,6 +101,13 @@ public TaskManagerRunner( // Initialize the TM metrics TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); + HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl( + taskManagerConfiguration.getTimeout().toMilliseconds(), + resourceID, + executor, + Executors.newSingleThreadScheduledExecutor(), --- End diff -- It is already added in new modifications --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737204 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1099,4 +1121,23 @@ public void run() { }); } } + + private final class ResourceManagerHeartbeatListener implements HeartbeatListener { + + ResourceManagerHeartbeatListener() { + } + + @Override + public void notifyHeartbeatTimeout(ResourceID resourceID) { + } --- End diff -- It will be added in the new modifications --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855217#comment-15855217 ] ASF GitHub Bot commented on FLINK-4354: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737151 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java --- @@ -122,4 +122,11 @@ void notifySlotAvailable( * @param optionalDiagnostics */ void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics); + + /** +* sends the heartbeat to resource manager from task manager +* @param resourceID unique id of the task manager +* @param payload the payload information of the task manager +*/ + void sendHeartbeatFromTaskManager(final ResourceID resourceID, final Object payload); --- End diff -- agree with it > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855216#comment-15855216 ] ASF GitHub Bot commented on FLINK-4354: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737093 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -137,6 +140,7 @@ public ResourceManager( this.jobManagerRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); this.leaderSessionId = null; + this.resourceID = ResourceID.generate(); --- End diff -- Yes, in the current implementation it is determined by **ResourceManagerRunner**.This PR is submitted long time ago and some dependent work is not complete at that time. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737151 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java --- @@ -122,4 +122,11 @@ void notifySlotAvailable( * @param optionalDiagnostics */ void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics); + + /** +* sends the heartbeat to resource manager from task manager +* @param resourceID unique id of the task manager +* @param payload the payload information of the task manager +*/ + void sendHeartbeatFromTaskManager(final ResourceID resourceID, final Object payload); --- End diff -- agree with it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99737093 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -137,6 +140,7 @@ public ResourceManager( this.jobManagerRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); this.leaderSessionId = null; + this.resourceID = ResourceID.generate(); --- End diff -- Yes, in the current implementation it is determined by **ResourceManagerRunner**.This PR is submitted long time ago and some dependent work is not complete at that time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855212#comment-15855212 ] ASF GitHub Bot commented on FLINK-4354: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99736682 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse registrationResponse, Thr final UUID resourceManagerLeaderId, final String taskExecutorAddress, final ResourceID resourceID, - final SlotReport slotReport) { + final SlotReport slotReport) + { --- End diff -- It may be a misoperation, I will be careful next time. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/2770#discussion_r99736682 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse registrationResponse, Thr final UUID resourceManagerLeaderId, final String taskExecutorAddress, final ResourceID resourceID, - final SlotReport slotReport) { + final SlotReport slotReport) + { --- End diff -- It may be a misoperation, I will be careful next time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855209#comment-15855209 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99736386 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -126,6 +129,9 @@ /** The metric registry in the task manager */ private final MetricRegistry metricRegistry; + /** The heartbeat manager for job manager and resource manager in the task manager */ + private final HeartbeatManagerImpl heartbeatManager; --- End diff -- Yes, the current **HeartbeatManagerImpl** is enough to use, so I did not wrapper it. > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99736386 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -126,6 +129,9 @@ /** The metric registry in the task manager */ private final MetricRegistry metricRegistry; + /** The heartbeat manager for job manager and resource manager in the task manager */ + private final HeartbeatManagerImpl heartbeatManager; --- End diff -- Yes, the current **HeartbeatManagerImpl** is enough to use, so I did not wrapper it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855207#comment-15855207 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on the issue: https://github.com/apache/flink/pull/3151 @tillrohrmann . Thank you for detail review and comments! This PR just submit the heartbeat logic in TM side, because there is already a jira of JM heartbeat side. For my implementation, the JM initiates the heartbeat with **HeartbeatManagerSenderImpl** and the TM responses the heartbeat with **HeartbeatManagerImpl**. So the heartbeat process is one-way. I think it is better to submit the JM heartbeat logic in this PR in order to understand easily. I will modify this PR soon, and for testing there already exists the UT for basic heartbeat logic. Do you mean to add some ITCases? > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...
Github user wangzhijiang999 commented on the issue: https://github.com/apache/flink/pull/3151 @tillrohrmann . Thank you for detail review and comments! This PR just submit the heartbeat logic in TM side, because there is already a jira of JM heartbeat side. For my implementation, the JM initiates the heartbeat with **HeartbeatManagerSenderImpl** and the TM responses the heartbeat with **HeartbeatManagerImpl**. So the heartbeat process is one-way. I think it is better to submit the JM heartbeat logic in this PR in order to understand easily. I will modify this PR soon, and for testing there already exists the UT for basic heartbeat logic. Do you mean to add some ITCases? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3138: #Flink-5522 Storm Local Cluster can't work with powermock
Github user liuyuzhong7 commented on the issue: https://github.com/apache/flink/pull/3138 @StephanEwen What should to to with this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855194#comment-15855194 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99734627 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort())); } + heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() { + @Override + public void sendHeartbeat(ResourceID resourceID, Object payload) { + jobMasterGateway.heartbeatFromTaskManager(resourceID, payload); + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Object payload) { + throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager."); --- End diff -- My previous understanding is that the heartbeat is only requested from RM and JM to TM, and the TM will only response the heartbeat. Do you mean the heartbeat can both request from both sides? If to do so, the TM also needs to schedule a heartbeat request at interval time. > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99734627 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMast jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort())); } + heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new HeartbeatTarget() { + @Override + public void sendHeartbeat(ResourceID resourceID, Object payload) { + jobMasterGateway.heartbeatFromTaskManager(resourceID, payload); + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Object payload) { + throw new UnsupportedOperationException("Should never call requestHeartbeat in task manager."); --- End diff -- My previous understanding is that the heartbeat is only requested from RM and JM to TM, and the TM will only response the heartbeat. Do you mean the heartbeat can both request from both sides? If to do so, the TM also needs to schedule a heartbeat request at interval time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855179#comment-15855179 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99733319 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -112,6 +114,13 @@ public TaskManagerRunner( // Initialize the TM metrics TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); + HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl( + taskManagerConfiguration.getTimeout().toMilliseconds(), + resourceID, + executor, + Executors.newSingleThreadScheduledExecutor(), --- End diff -- I agree with that, and initially I want to reuse the executor in **RPCService** but it needs some modifications. I will introduce it in Runner in order to share among components. > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99733319 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -112,6 +114,13 @@ public TaskManagerRunner( // Initialize the TM metrics TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, taskManagerServices.getNetworkEnvironment()); + HeartbeatManagerImpl heartbeatManager = new HeartbeatManagerImpl( + taskManagerConfiguration.getTimeout().toMilliseconds(), + resourceID, + executor, + Executors.newSingleThreadScheduledExecutor(), --- End diff -- I agree with that, and initially I want to reuse the executor in **RPCService** but it needs some modifications. I will introduce it in Runner in order to share among components. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855171#comment-15855171 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99732610 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) { } } + /** +* The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID +* and trigger different processes. +*/ + private final class JMRMHeartbeatListener implements HeartbeatListener { + + JMRMHeartbeatListener() { + } + + @Override + public void notifyHeartbeatTimeout(final ResourceID resourceID) { + log.info("Notify heartbeat timeout for resourceID {}", resourceID); --- End diff -- Yes, it actually should trigger some actions with timeout. Currently I did not submit this part because I think it is related with failure detection logic and supposed to submit in another PR. To make the heartbeat mechanism complete, I will add this part in the following modifications. > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99732610 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) { } } + /** +* The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID +* and trigger different processes. +*/ + private final class JMRMHeartbeatListener implements HeartbeatListener { + + JMRMHeartbeatListener() { + } + + @Override + public void notifyHeartbeatTimeout(final ResourceID resourceID) { + log.info("Notify heartbeat timeout for resourceID {}", resourceID); --- End diff -- Yes, it actually should trigger some actions with timeout. Currently I did not submit this part because I think it is related with failure detection logic and supposed to submit in another PR. To make the heartbeat mechanism complete, I will add this part in the following modifications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99732047 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) { } } + /** +* The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID +* and trigger different processes. +*/ + private final class JMRMHeartbeatListener implements HeartbeatListener { --- End diff -- From TaskExecutor side, it will monitor the JM and RM that initiate the heartbeat, so JMRMHeartbeatListener indicates the heartbeat timeout with JM or RM, the current PR just shows the interaction with JM part. I am supposed to submit the RM part in another PR, maybe submit both sides together will help to understand easily. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855166#comment-15855166 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99732047 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) { } } + /** +* The heartbeat listener for JobManager and ResourceManager, they can be distinguished by ResourceID +* and trigger different processes. +*/ + private final class JMRMHeartbeatListener implements HeartbeatListener { --- End diff -- From TaskExecutor side, it will monitor the JM and RM that initiate the heartbeat, so JMRMHeartbeatListener indicates the heartbeat timeout with JM or RM, the current PR just shows the interaction with JM part. I am supposed to submit the RM part in another PR, maybe submit both sides together will help to understand easily. > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855154#comment-15855154 ] ASF GitHub Bot commented on FLINK-4364: --- Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99731288 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId, final TaskManagerLocation taskManagerLocation, final UUID leaderId, @RpcTimeout final Time timeout); + + /** +* Send the heartbeat to job manager from task manager +* +* @param resourceID unique id of the task manager +* @param payload the payload information from the task manager +*/ + void heartbeatFromTaskManager(final ResourceID resourceID, final Object payload); --- End diff -- Currently we have not considered which specific payload information should be attached with the heartbeat, so use the object to work around. It should be changed to specific type if confirmation. Do you think which payload should be attached necessary currently? > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...
Github user wangzhijiang999 commented on a diff in the pull request: https://github.com/apache/flink/pull/3151#discussion_r99731288 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId, final TaskManagerLocation taskManagerLocation, final UUID leaderId, @RpcTimeout final Time timeout); + + /** +* Send the heartbeat to job manager from task manager +* +* @param resourceID unique id of the task manager +* @param payload the payload information from the task manager +*/ + void heartbeatFromTaskManager(final ResourceID resourceID, final Object payload); --- End diff -- Currently we have not considered which specific payload information should be attached with the heartbeat, so use the object to work around. It should be changed to specific type if confirmation. Do you think which payload should be attached necessary currently? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5725) Support JOIN between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5725: --- Component/s: Table API & SQL > Support JOIN between two streams in the SQL API > --- > > Key: FLINK-5725 > URL: https://issues.apache.org/jira/browse/FLINK-5725 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > As described in the title. > This jira proposes to support joining two streaming tables in the SQL API. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854944#comment-15854944 ] Haohui Mai commented on FLINK-5583: --- In general (1) sounds good to me. Taking a closer look it seems that it might require a stateful API instead of the traditional {{Collector}} APIs. We have a mission-critical use case that needs to write all corrupted messages to a persistent store so that these messages can be inspected and backfilled later. Ideally the {{DeserializationSchema}} could some state and probably will need to be synchronized when checkpoints happen. It might be more natural to deserialize messages as a subsequent stage of the consumer. Thoughts? [~rmetzger] [~tzulitai] > Support flexible error handling in the Kafka consumer > - > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Haohui Mai >Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and > exceptions in the Kafka consumer in order to build a robust application in > production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The > streaming pipeline might want to bail out (which is the current behavior) or > to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like > structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some > APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854864#comment-15854864 ] ASF GitHub Bot commented on FLINK-5624: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` clause. > Support tumbling window on streaming tables in the SQL API > -- > > Key: FLINK-5624 > URL: https://issues.apache.org/jira/browse/FLINK-5624 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > This is a follow up of FLINK-4691. > FLINK-4691 adds supports for group-windows for streaming tables. This jira > proposes to expose the functionality in the SQL layer via the {{GROUP BY}} > clauses, as described in > http://calcite.apache.org/docs/stream.html#tumbling-windows. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` clause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5725) Support JOIN between two streams in the SQL API
Haohui Mai created FLINK-5725: - Summary: Support JOIN between two streams in the SQL API Key: FLINK-5725 URL: https://issues.apache.org/jira/browse/FLINK-5725 Project: Flink Issue Type: New Feature Reporter: Haohui Mai Assignee: Haohui Mai As described in the title. This jira proposes to support joining two streaming tables in the SQL API. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854818#comment-15854818 ] Haohui Mai commented on FLINK-5624: --- Thanks for the explanation. I took a closer look at the built-in function route. Recognizing the function is relatively straightforward. The problem is that it is difficult for the {{rowtime()}} function to refer to the corresponding {{DataStream}} at the code generation phase. Probably it might be easier to add some system / virtual columns instead. > Support tumbling window on streaming tables in the SQL API > -- > > Key: FLINK-5624 > URL: https://issues.apache.org/jira/browse/FLINK-5624 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > This is a follow up of FLINK-4691. > FLINK-4691 adds supports for group-windows for streaming tables. This jira > proposes to expose the functionality in the SQL layer via the {{GROUP BY}} > clauses, as described in > http://calcite.apache.org/docs/stream.html#tumbling-windows. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5718) Handle JVM Fatal Exceptions in Tasks
[ https://issues.apache.org/jira/browse/FLINK-5718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854677#comment-15854677 ] ASF GitHub Bot commented on FLINK-5718: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/3276#discussion_r99674991 --- Diff: docs/setup/config.md --- @@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m - `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)). -- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. +- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory. --- End diff -- Should this warning also be added to `flink-conf.yaml`? > Handle JVM Fatal Exceptions in Tasks > > > Key: FLINK-5718 > URL: https://issues.apache.org/jira/browse/FLINK-5718 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > The TaskManager catches and handles all types of exceptions right now (all > {{Throwables}}). The intention behind that is: > - Many {{Error}} subclasses are recoverable for the TaskManagers, such as > failure to load/link user code > - We want to give eager notifications to the JobManager in case something > in a task goes wrong. > However, there are some exceptions which should probably simply terminate the > JVM, if caught in the task thread, because they may leave the JVM in a > dysfunctional limbo state: > - {{OutOfMemoryError}} > - {{InternalError}} > - {{UnknownError}} > - {{ZipError}} > These are basically the subclasses of {{VirtualMachineError}}, except for > {{StackOverflowError}}, which is recoverable and usually recovered already by > the time the exception has been thrown and the stack unwound. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3276: [FLINK-5718] [core] TaskManagers exit the JVM on f...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/3276#discussion_r99674991 --- Diff: docs/setup/config.md --- @@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m - `taskmanager.memory.segment-size`: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)). -- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. +- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to `true`, then it is advised that this configuration is also set to `true`. If this configuration is set to `false` cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. **Note:** For streaming setups, we highly recommend to set this value to `false` as the core state backends currently do not use the managed memory. --- End diff -- Should this warning also be added to `flink-conf.yaml`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3280: [Flink-5724] Error in documentation zipping elemen...
GitHub user Fokko opened a pull request: https://github.com/apache/flink/pull/3280 [Flink-5724] Error in documentation zipping elements Because the Scala tab is defined twice, it is not possible to open the Python tab. Please look at the documentation page itself: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/zip_elements_guide.html I believe it is a copy-paste error (which also happens to me too often ;) Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/Fokko/flink fd-fix-docs-zipping-elements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3280 commit f490fbcd3633f77e87d89ba397c2b1ed1a030543 Author: Fokko DriesprongDate: 2017-02-06T20:08:31Z Error in documentation zipping elements Because the Scala tab is defined twice, it is not possible to open the Python tab. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3163) Configure Flink for NUMA systems
[ https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3163: -- Fix Version/s: 1.3.0 > Configure Flink for NUMA systems > > > Key: FLINK-3163 > URL: https://issues.apache.org/jira/browse/FLINK-3163 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.3.0 > > > On NUMA systems Flink can be pinned to a single physical processor ("node") > using {{numactl --membind=$node --cpunodebind=$node }}. Commonly > available NUMA systems include the largest AWS and Google Compute instances. > For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could > configure a single TaskManager with 36 slots or have Flink create two > TaskManagers bound to each of the NUMA nodes, each with 18 slots. > There may be some extra overhead in transferring network buffers between > TaskManagers on the same system, though the fraction of data shuffled in this > manner decreases with the size of the cluster. The performance improvement > from only accessing local memory looks to be significant though difficult to > benchmark. > The JobManagers may fit into NUMA nodes rather than requiring full systems. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-2908) Web interface redraw web plan when browser resized
[ https://issues.apache.org/jira/browse/FLINK-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-2908: -- Fix Version/s: 1.3.0 > Web interface redraw web plan when browser resized > -- > > Key: FLINK-2908 > URL: https://issues.apache.org/jira/browse/FLINK-2908 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0 > > > The job plan graph does not resize when the user expands the browser window > (only a change in width matters). > To reproduce: 1) open the plan tab of a running or completed job in a > non-maximized browser window (not full width), 2) maximize the browser window. > Workaround: refresh the web page. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5724) Error in the 'Zipping Elements' docs
Fokko Driesprong created FLINK-5724: --- Summary: Error in the 'Zipping Elements' docs Key: FLINK-5724 URL: https://issues.apache.org/jira/browse/FLINK-5724 Project: Flink Issue Type: Bug Reporter: Fokko Driesprong The tab for the Python documentation isn't working because there are two tabs pointing at the Scala example. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-2883) Add documentation to forbid key-modifying ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-2883. - Resolution: Implemented 1.2: 37d6f8196f12714491c95adb73319bb36e7a0d34 master: 11ebf484280314231d146dcfb0b973934448f00b > Add documentation to forbid key-modifying ReduceFunction > > > Key: FLINK-2883 > URL: https://issues.apache.org/jira/browse/FLINK-2883 > Project: Flink > Issue Type: Task > Components: DataStream API, Documentation >Affects Versions: 0.10.0 >Reporter: Till Rohrmann >Assignee: Greg Hogan > Fix For: 1.3.0, 1.2.1 > > > If one uses a combinable reduce operation which also changes the key value of > the underlying data element, then the results of the reduce operation can > become wrong. The reason is that after the combine phase, another reduce > operator is executed which will then reduce the elements based on the new key > values. This might be not so surprising if one explicitly defined ones > {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} > conceals the fact that a combiner is used implicitly. Furthermore, the API > does not prevent the user from changing the key fields which could solve the > problem. > The following example program illustrates the problem > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > env.setParallelism(1) > val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4)) > val result = input.groupBy(0).reduce{ > (left, right) => > (left._1 + right._1, left._2 + right._2) > } > result.output(new PrintingOutputFormat[Int]()) > env.execute() > {code} > The expected output is > {code} > (2, 5) > (2, 3) > (6, 7) > {code} > However, the actual output is > {code} > (4, 8) > (6, 7) > {code} > I think that the underlying problem is that associativity and commutativity > is not sufficient for a combinable reduce operation. Additionally we also > need to make sure that the key stays the same. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5680) Document env.ssh.opts
[ https://issues.apache.org/jira/browse/FLINK-5680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-5680. - Resolution: Implemented 1.2: 57cd0713183b4a2a2463736f672e1cd74cd913d6 master: 327ff060a7b64862aa6a33b6c8b07bbef05bde76 > Document env.ssh.opts > - > > Key: FLINK-5680 > URL: https://issues.apache.org/jira/browse/FLINK-5680 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.2.0, 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0, 1.2.1 > > > Document {{env.ssh.opts}} in {{setup/config.html}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5680) Document env.ssh.opts
[ https://issues.apache.org/jira/browse/FLINK-5680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854636#comment-15854636 ] ASF GitHub Bot commented on FLINK-5680: --- Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/3247 > Document env.ssh.opts > - > > Key: FLINK-5680 > URL: https://issues.apache.org/jira/browse/FLINK-5680 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.2.0, 1.3.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.3.0, 1.2.1 > > > Document {{env.ssh.opts}} in {{setup/config.html}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2883) Add documentation to forbid key-modifying ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854637#comment-15854637 ] ASF GitHub Bot commented on FLINK-2883: --- Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/3256 > Add documentation to forbid key-modifying ReduceFunction > > > Key: FLINK-2883 > URL: https://issues.apache.org/jira/browse/FLINK-2883 > Project: Flink > Issue Type: Task > Components: DataStream API, Documentation >Affects Versions: 0.10.0 >Reporter: Till Rohrmann >Assignee: Greg Hogan > Fix For: 1.3.0, 1.2.1 > > > If one uses a combinable reduce operation which also changes the key value of > the underlying data element, then the results of the reduce operation can > become wrong. The reason is that after the combine phase, another reduce > operator is executed which will then reduce the elements based on the new key > values. This might be not so surprising if one explicitly defined ones > {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} > conceals the fact that a combiner is used implicitly. Furthermore, the API > does not prevent the user from changing the key fields which could solve the > problem. > The following example program illustrates the problem > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > env.setParallelism(1) > val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4)) > val result = input.groupBy(0).reduce{ > (left, right) => > (left._1 + right._1, left._2 + right._2) > } > result.output(new PrintingOutputFormat[Int]()) > env.execute() > {code} > The expected output is > {code} > (2, 5) > (2, 3) > (6, 7) > {code} > However, the actual output is > {code} > (4, 8) > (6, 7) > {code} > I think that the underlying problem is that associativity and commutativity > is not sufficient for a combinable reduce operation. Additionally we also > need to make sure that the key stays the same. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3256: [FLINK-2883] [docs] Add documentation to forbid ke...
Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/3256 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3247: [FLINK-5680] [docs] Document env.ssh.opts
Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/3247 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5723) Use "Used" instead of "Initial" to make taskmanager tag more readable
[ https://issues.apache.org/jira/browse/FLINK-5723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854572#comment-15854572 ] ASF GitHub Bot commented on FLINK-5723: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3274 you have to modify the corresponding jade files; modifications to the html files will be overridden the next time someone builds the web-ui. > Use "Used" instead of "Initial" to make taskmanager tag more readable > - > > Key: FLINK-5723 > URL: https://issues.apache.org/jira/browse/FLINK-5723 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Tao Wang >Priority: Trivial > > Now in JobManager web fronted, the used memory of task managers is presented > as "Initial" in table header, which actually means "memory used", from codes. > I'd like change it to be more readable, even it is trivial one. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3274: [FLINK-5723][UI]Use Used instead of Initial to make taskm...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3274 you have to modify the corresponding jade files; modifications to the html files will be overridden the next time someone builds the web-ui. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue
[ https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5376: -- Description: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. was: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. > Misleading log statements in UnorderedStreamElementQueue > > > Key: FLINK-5376 > URL: https://issues.apache.org/jira/browse/FLINK-5376 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Priority: Minor > > The following are two examples where ordered stream element queue is > mentioned: > {code} > LOG.debug("Put element into ordered stream element queue. New filling > degree " + > "({}/{}).", numberEntries, capacity); > return true; > } else { > LOG.debug("Failed to put element into ordered stream element queue > because it " + > {code} > I guess OrderedStreamElementQueue was coded first. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346 ] Ted Yu edited comment on FLINK-5486 at 2/6/17 6:20 PM: --- Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} was (Author: yuzhih...@gmail.com): Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState bucketState : restoredState.bucketStates.values()) { {code} > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5488) yarnClient should be closed in AbstractYarnClusterDescriptor for error conditions
[ https://issues.apache.org/jira/browse/FLINK-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5488: -- Description: Here is one example: {code} if(jobManagerMemoryMb > maxRes.getMemory() ) { failSessionDuringDeployment(yarnClient, yarnApplication); throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n" + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE); } {code} yarnClient implements Closeable. It should be closed in situations where exception is thrown. was: Here is one example: {code} if(jobManagerMemoryMb > maxRes.getMemory() ) { failSessionDuringDeployment(yarnClient, yarnApplication); throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n" + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE); } {code} yarnClient implements Closeable. It should be closed in situations where exception is thrown. > yarnClient should be closed in AbstractYarnClusterDescriptor for error > conditions > - > > Key: FLINK-5488 > URL: https://issues.apache.org/jira/browse/FLINK-5488 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu > > Here is one example: > {code} > if(jobManagerMemoryMb > maxRes.getMemory() ) { > failSessionDuringDeployment(yarnClient, yarnApplication); > throw new YarnDeploymentException("The cluster does not have the > requested resources for the JobManager available!\n" > + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + > jobManagerMemoryMb + "MB. " + NOTE); > } > {code} > yarnClient implements Closeable. > It should be closed in situations where exception is thrown. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3163) Configure Flink for NUMA systems
[ https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854437#comment-15854437 ] ASF GitHub Bot commented on FLINK-3163: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3249 @StephanEwen from the discussion of FLINK-3163 I also had the idea of `taskmanager.compute.fraction` where the number of slots would be a multiple of the number of cores / vcores. Since Flink processes these as opaque strings the only purpose is to help organize [config page](https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html). I have found YARN-5764, MESOS-5342, and MESOS-314 discussing NUMA support for containers but all are works in progress. I see that Docker supports `--cpuset-cpus` and `--cpuset-mems` in [docker run](https://docs.docker.com/engine/reference/run/) and in [docker compose](https://docs.docker.com/compose/compose-file) config version 2 (using `cpuset`). It's not clear how to dynamically bind Flink to numa nodes without scripting Flink's docker commands. > Configure Flink for NUMA systems > > > Key: FLINK-3163 > URL: https://issues.apache.org/jira/browse/FLINK-3163 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > On NUMA systems Flink can be pinned to a single physical processor ("node") > using {{numactl --membind=$node --cpunodebind=$node }}. Commonly > available NUMA systems include the largest AWS and Google Compute instances. > For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could > configure a single TaskManager with 36 slots or have Flink create two > TaskManagers bound to each of the NUMA nodes, each with 18 slots. > There may be some extra overhead in transferring network buffers between > TaskManagers on the same system, though the fraction of data shuffled in this > manner decreases with the size of the cluster. The performance improvement > from only accessing local memory looks to be significant though difficult to > benchmark. > The JobManagers may fit into NUMA nodes rather than requiring full systems. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3249: [FLINK-3163] [scripts] Configure Flink for NUMA systems
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3249 @StephanEwen from the discussion of FLINK-3163 I also had the idea of `taskmanager.compute.fraction` where the number of slots would be a multiple of the number of cores / vcores. Since Flink processes these as opaque strings the only purpose is to help organize [config page](https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html). I have found YARN-5764, MESOS-5342, and MESOS-314 discussing NUMA support for containers but all are works in progress. I see that Docker supports `--cpuset-cpus` and `--cpuset-mems` in [docker run](https://docs.docker.com/engine/reference/run/) and in [docker compose](https://docs.docker.com/compose/compose-file) config version 2 (using `cpuset`). It's not clear how to dynamically bind Flink to numa nodes without scripting Flink's docker commands. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3279: [FLINK-5618][docs] createSerializer must actually ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3279 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5618) Add queryable state documentation
[ https://issues.apache.org/jira/browse/FLINK-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854417#comment-15854417 ] ASF GitHub Bot commented on FLINK-5618: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3279 > Add queryable state documentation > - > > Key: FLINK-5618 > URL: https://issues.apache.org/jira/browse/FLINK-5618 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Ufuk Celebi > Fix For: 1.2.0, 1.3.0 > > > Adds docs about how to use queryable state usage. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3278 @tillrohrmann Yes, in the condition that you described, then at-least-once doesn't hold. I said _might_ mainly considering there is chance that for every checkpoint barrier, the previous events (i.e. `event1` in your example) has been committed. But yes, essentially this indeterminate behaviour means that it is not at-least-once, so I'm incorrect in saying that it _might_. I was trying to point out that this indeterminate behaviour made it hard to have a stable test for `testAtLeastOnceProducerFailsIfFlushingDisabled()` without relying on sleeping, which ultimately leads to flaky tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854409#comment-15854409 ] ASF GitHub Bot commented on FLINK-5701: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3278 @tillrohrmann Yes, in the condition that you described, then at-least-once doesn't hold. I said _might_ mainly considering there is chance that for every checkpoint barrier, the previous events (i.e. `event1` in your example) has been committed. But yes, essentially this indeterminate behaviour means that it is not at-least-once, so I'm incorrect in saying that it _might_. I was trying to point out that this indeterminate behaviour made it hard to have a stable test for `testAtLeastOnceProducerFailsIfFlushingDisabled()` without relying on sleeping, which ultimately leads to flaky tests. > FlinkKafkaProducer should check asyncException on checkpoints > - > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5564) User Defined Aggregates
[ https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854402#comment-15854402 ] Fabian Hueske edited comment on FLINK-5564 at 2/6/17 5:19 PM: -- Hi [~Shaoxuan Wang], I think it should be possible to split the first three steps as follows: 1. Add the new UDAGG interface and migrate existing aggregation functions to it. We keep the current implementation and just add code and unit tests for the implementations of the aggregation functions that implement the new interface. 2. Use the new aggregation function for batch tables. We switch the implementation for the DataSet runtime code. We still keep the old aggregation functions for the streaming code. 3. Use the new aggregation function for streaming tables. We switch the implementation for the DataStream runtime code. In this step we remove the old aggregation functions and clean up. Adressing 1, 2, and 3 in a single issue will result in a huge PR which will be hard to review. I'd prefer several smaller steps with well defined scope. Thanks, Fabian was (Author: fhueske): Hi [~Shaoxuan Wang], I think it should be possible to split the first three steps as follows: 1. Add the new UDAGG interface and migrate existing aggregation functions to it. We keep the current implementation and just add code and unit tests for the implementations of the aggregation functions that implement the new interface. 2. Use the new aggregation function for batch tables. We switch the implementation for the DataSet runtime code. We still keep the old aggregation functions for the streaming code. 3. Use the new aggregation function for streaming tables. We switch the implementation for the DataStream runtime code. In this step we remove the old aggregation functions and clean up. Adressing 1, 2, and 3 in a single issue will result in a huge PR which will be hard to review. I'd prefer several smaller steps with well defined scope. Regarding the discussion of the window OVER functions. It would be great if you could post your comment (with which I agree) to the corresponding JIRA issue and the discussion on the dev list otherwise it might not be noticed. Thanks, Fabian > User Defined Aggregates > --- > > Key: FLINK-5564 > URL: https://issues.apache.org/jira/browse/FLINK-5564 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > User-defined aggregates would be a great addition to the Table API / SQL. > The current aggregate interface is not well suited for the external users. > This issue proposes to redesign the aggregate such that we can expose an > better external UDAGG interface to the users. The detailed design proposal > can be found here: > https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit > Motivation: > 1. The current aggregate interface is not very concise to the users. One > needs to know the design details of the intermediate Row buffer before > implements an Aggregate. Seven functions are needed even for a simple Count > aggregate. > 2. Another limitation of current aggregate function is that it can only be > applied on one single column. There are many scenarios which require the > aggregate function taking multiple columns as the inputs. > 3. “Retraction” is not considered and covered in the current Aggregate. > 4. It might be very good to have a local/global aggregate query plan > optimization, which is very promising to optimize UDAGG performance in some > scenarios. > Proposed Changes: > 1. Implement an aggregate dataStream API (Done by > [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582]) > 2. Update all the existing aggregates to use the new aggregate dataStream API > 3. Provide a better User-Defined Aggregate interface > 4. Add retraction support > 5. Add local/global aggregate -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5564) User Defined Aggregates
[ https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854402#comment-15854402 ] Fabian Hueske commented on FLINK-5564: -- Hi [~Shaoxuan Wang], I think it should be possible to split the first three steps as follows: 1. Add the new UDAGG interface and migrate existing aggregation functions to it. We keep the current implementation and just add code and unit tests for the implementations of the aggregation functions that implement the new interface. 2. Use the new aggregation function for batch tables. We switch the implementation for the DataSet runtime code. We still keep the old aggregation functions for the streaming code. 3. Use the new aggregation function for streaming tables. We switch the implementation for the DataStream runtime code. In this step we remove the old aggregation functions and clean up. Adressing 1, 2, and 3 in a single issue will result in a huge PR which will be hard to review. I'd prefer several smaller steps with well defined scope. Regarding the discussion of the window OVER functions. It would be great if you could post your comment (with which I agree) to the corresponding JIRA issue and the discussion on the dev list otherwise it might not be noticed. Thanks, Fabian > User Defined Aggregates > --- > > Key: FLINK-5564 > URL: https://issues.apache.org/jira/browse/FLINK-5564 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > User-defined aggregates would be a great addition to the Table API / SQL. > The current aggregate interface is not well suited for the external users. > This issue proposes to redesign the aggregate such that we can expose an > better external UDAGG interface to the users. The detailed design proposal > can be found here: > https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit > Motivation: > 1. The current aggregate interface is not very concise to the users. One > needs to know the design details of the intermediate Row buffer before > implements an Aggregate. Seven functions are needed even for a simple Count > aggregate. > 2. Another limitation of current aggregate function is that it can only be > applied on one single column. There are many scenarios which require the > aggregate function taking multiple columns as the inputs. > 3. “Retraction” is not considered and covered in the current Aggregate. > 4. It might be very good to have a local/global aggregate query plan > optimization, which is very promising to optimize UDAGG performance in some > scenarios. > Proposed Changes: > 1. Implement an aggregate dataStream API (Done by > [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582]) > 2. Update all the existing aggregates to use the new aggregate dataStream API > 3. Provide a better User-Defined Aggregate interface > 4. Add retraction support > 5. Add local/global aggregate -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs
[ https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854396#comment-15854396 ] ASF GitHub Bot commented on FLINK-5575: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3242 > in old releases, warn users and guide them to the latest stable docs > > > Key: FLINK-5575 > URL: https://issues.apache.org/jira/browse/FLINK-5575 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson > Fix For: 1.0.4, 1.2.0, 1.3.0, 1.1.5 > > > Old versions of Flink (especially version 0.8) are being frequently studied, > downloaded, and used by new users (because google leads them there). I > propose to guide folks to the latest stable release by adding a link on every > documentation page in the old docs that links to the home page of the latest > stable docs. > The redirect lives at flink.apache.org/q/stable-docs.html, and will need to > be modified with each major release (e.g. when 1.2 is released). > This problem affects all releases before 1.1, but the stats show that 0.8, > 0.9, 0.10, and 1.0 are the most important to deal with. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3242: [FLINK-5575][docs] reworked the mechanism that war...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3242 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs
[ https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-5575. -- Resolution: Fixed Fix Version/s: 1.2.0 1.1.5 1.3.0 1.0.4 Fixed in dd7311e (master), 6986f49 (release-1.2), 3cd7c8e (release-1.1), 63660ca (release-1.0), 43f8261 (release-0.10), 3ea8ae3 (release-0.9), f6f823e (release-0.8). > in old releases, warn users and guide them to the latest stable docs > > > Key: FLINK-5575 > URL: https://issues.apache.org/jira/browse/FLINK-5575 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson > Fix For: 1.0.4, 1.3.0, 1.1.5, 1.2.0 > > > Old versions of Flink (especially version 0.8) are being frequently studied, > downloaded, and used by new users (because google leads them there). I > propose to guide folks to the latest stable release by adding a link on every > documentation page in the old docs that links to the home page of the latest > stable docs. > The redirect lives at flink.apache.org/q/stable-docs.html, and will need to > be modified with each major release (e.g. when 1.2 is released). > This problem affects all releases before 1.1, but the stats show that 0.8, > 0.9, 0.10, and 1.0 are the most important to deal with. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3240: [FLINK-5575][docs] in old releases, warn users and guide ...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3240 Could you please close this PR manually? Bot closing only works for the master branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs
[ https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854384#comment-15854384 ] ASF GitHub Bot commented on FLINK-5575: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3239 Could you please close this PR manually? Bot closing only works for the master branch. > in old releases, warn users and guide them to the latest stable docs > > > Key: FLINK-5575 > URL: https://issues.apache.org/jira/browse/FLINK-5575 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson > > Old versions of Flink (especially version 0.8) are being frequently studied, > downloaded, and used by new users (because google leads them there). I > propose to guide folks to the latest stable release by adding a link on every > documentation page in the old docs that links to the home page of the latest > stable docs. > The redirect lives at flink.apache.org/q/stable-docs.html, and will need to > be modified with each major release (e.g. when 1.2 is released). > This problem affects all releases before 1.1, but the stats show that 0.8, > 0.9, 0.10, and 1.0 are the most important to deal with. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs
[ https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854383#comment-15854383 ] ASF GitHub Bot commented on FLINK-5575: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3240 Could you please close this PR manually? Bot closing only works for the master branch. > in old releases, warn users and guide them to the latest stable docs > > > Key: FLINK-5575 > URL: https://issues.apache.org/jira/browse/FLINK-5575 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson > > Old versions of Flink (especially version 0.8) are being frequently studied, > downloaded, and used by new users (because google leads them there). I > propose to guide folks to the latest stable release by adding a link on every > documentation page in the old docs that links to the home page of the latest > stable docs. > The redirect lives at flink.apache.org/q/stable-docs.html, and will need to > be modified with each major release (e.g. when 1.2 is released). > This problem affects all releases before 1.1, but the stats show that 0.8, > 0.9, 0.10, and 1.0 are the most important to deal with. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3239: [FLINK-5575][docs] in old releases, warn users and guide ...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3239 Could you please close this PR manually? Bot closing only works for the master branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs
[ https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854382#comment-15854382 ] ASF GitHub Bot commented on FLINK-5575: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3241 Could you please close this PR manually? Bot closing only works for the master branch. > in old releases, warn users and guide them to the latest stable docs > > > Key: FLINK-5575 > URL: https://issues.apache.org/jira/browse/FLINK-5575 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson > > Old versions of Flink (especially version 0.8) are being frequently studied, > downloaded, and used by new users (because google leads them there). I > propose to guide folks to the latest stable release by adding a link on every > documentation page in the old docs that links to the home page of the latest > stable docs. > The redirect lives at flink.apache.org/q/stable-docs.html, and will need to > be modified with each major release (e.g. when 1.2 is released). > This problem affects all releases before 1.1, but the stats show that 0.8, > 0.9, 0.10, and 1.0 are the most important to deal with. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs
[ https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854385#comment-15854385 ] ASF GitHub Bot commented on FLINK-5575: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3238 Could you please close this PR manually? Bot closing only works for the master branch. > in old releases, warn users and guide them to the latest stable docs > > > Key: FLINK-5575 > URL: https://issues.apache.org/jira/browse/FLINK-5575 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson > > Old versions of Flink (especially version 0.8) are being frequently studied, > downloaded, and used by new users (because google leads them there). I > propose to guide folks to the latest stable release by adding a link on every > documentation page in the old docs that links to the home page of the latest > stable docs. > The redirect lives at flink.apache.org/q/stable-docs.html, and will need to > be modified with each major release (e.g. when 1.2 is released). > This problem affects all releases before 1.1, but the stats show that 0.8, > 0.9, 0.10, and 1.0 are the most important to deal with. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3241: [FLINK-5575][docs] in old releases, warn users and guide ...
Github user uce commented on the issue: https://github.com/apache/flink/pull/3241 Could you please close this PR manually? Bot closing only works for the master branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---