[jira] [Commented] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937374#comment-16937374 ] Shimin Yang commented on FLINK-9749: Hi [~kkl0u], that would be nice. One more question is about hadoop 2.6 support, I think we actually don't need truncate method if we use OnCheckpointRollingPolicy. > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217 ] Shimin Yang edited comment on FLINK-9749 at 9/22/19 5:41 AM: - [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load partition to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. We also support more complex partition like combining date time and key. Now we want to refactor it using StreamingFileSink. was (Author: dangdangdang): [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. We also support more complex partition like combining date time and key. Now we want to refactor it using StreamingFileSink. > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be drop
[jira] [Comment Edited] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217 ] Shimin Yang edited comment on FLINK-9749 at 9/22/19 5:38 AM: - [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. We also support more complex partition like combining date time and key. Now we want to refactor it using StreamingFileSink. was (Author: dangdangdang): [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217 ] Shimin Yang commented on FLINK-9749: [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934266#comment-16934266 ] Shimin Yang commented on FLINK-9749: Hi [~kkl0u], it does seem like a replicating of window mechanism, but use window operator to buffer records according to event time actually would cost too much state IO, using a bucket assigner based on event time instead would be much cheaper. > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9749) Rework Bucketing Sink
[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934185#comment-16934185 ] Shimin Yang commented on FLINK-9749: Hi [~kkl0u], is there anyone working on DateTimeBucketAssigner of event time right now? > Rework Bucketing Sink > - > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14003) Add access to customized state other than built in state types
[ https://issues.apache.org/jira/browse/FLINK-14003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16925394#comment-16925394 ] Shimin Yang commented on FLINK-14003: - [~carp84], sounds good. > Add access to customized state other than built in state types > -- > > Key: FLINK-14003 > URL: https://issues.apache.org/jira/browse/FLINK-14003 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Shimin Yang >Priority: Minor > > Currently the keyed state backend supports states like value, list, map, > although they are quite useful, but it may not fits all user cases. > User can develop their customized state backend based on different data store > as a plugin, but they do not have access to customized state if the type is > not one of those built-in types. > I propose to add getPartitionedState method to RuntimeContext and > KeyedStateStore interface to allow user to access their customized state, and > it will not affect current code path. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14003) Add access to customized state other than built in state types
[ https://issues.apache.org/jira/browse/FLINK-14003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16925116#comment-16925116 ] Shimin Yang commented on FLINK-14003: - [~carp84], what do you think? > Add access to customized state other than built in state types > -- > > Key: FLINK-14003 > URL: https://issues.apache.org/jira/browse/FLINK-14003 > Project: Flink > Issue Type: Wish > Components: Runtime / State Backends >Reporter: Shimin Yang >Priority: Minor > > Currently the keyed state backend supports states like value, list, map, > although they are quite useful, but it may not fits all user cases. > User can develop their customized state backend based on different data store > as a plugin, but they do not have access to customized state if the type is > not one of those built-in types. > I propose to add getPartitionedState method to RuntimeContext and > KeyedStateStore interface to allow user to access their customized state, and > it will not affect current code path. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14003) Add access to customized state other than built in state types
Shimin Yang created FLINK-14003: --- Summary: Add access to customized state other than built in state types Key: FLINK-14003 URL: https://issues.apache.org/jira/browse/FLINK-14003 Project: Flink Issue Type: Wish Components: Runtime / State Backends Reporter: Shimin Yang Currently the keyed state backend supports states like value, list, map, although they are quite useful, but it may not fits all user cases. User can develop their customized state backend based on different data store as a plugin, but they do not have access to customized state if the type is not one of those built-in types. I propose to add getPartitionedState method to RuntimeContext and KeyedStateStore interface to allow user to access their customized state, and it will not affect current code path. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-11705) remove org.apache.flink.runtime.testingUtils.TestingUtils
[ https://issues.apache.org/jira/browse/FLINK-11705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773851#comment-16773851 ] Shimin Yang commented on FLINK-11705: - Hi [~till.rohrmann], yes, we will port the methods and inner class which still in use to Java. > remove org.apache.flink.runtime.testingUtils.TestingUtils > - > > Key: FLINK-11705 > URL: https://issues.apache.org/jira/browse/FLINK-11705 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11705) remove org.apache.flink.runtime.testingUtils.TestingUtils
[ https://issues.apache.org/jira/browse/FLINK-11705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-11705: Issue Type: Sub-task (was: Bug) Parent: FLINK-10392 > remove org.apache.flink.runtime.testingUtils.TestingUtils > - > > Key: FLINK-11705 > URL: https://issues.apache.org/jira/browse/FLINK-11705 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11705) remove org.apache.flink.runtime.testingUtils.TestingUtils
Shimin Yang created FLINK-11705: --- Summary: remove org.apache.flink.runtime.testingUtils.TestingUtils Key: FLINK-11705 URL: https://issues.apache.org/jira/browse/FLINK-11705 Project: Flink Issue Type: Bug Components: Tests Reporter: Shimin Yang Assignee: Shimin Yang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11593) Check & port TaskManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773144#comment-16773144 ] Shimin Yang commented on FLINK-11593: - Hi [~till.rohrmann], I have created a pull request for this jira. Hope this patch could catch up with the upcoming release. > Check & port TaskManagerTest to new code base > - > > Key: FLINK-11593 > URL: https://issues.apache.org/jira/browse/FLINK-11593 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Check and port {{TaskManagerTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11593) Check & port TaskManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16771493#comment-16771493 ] Shimin Yang commented on FLINK-11593: - Hi [~till.rohrmann], it will be finished before Friday. > Check & port TaskManagerTest to new code base > - > > Key: FLINK-11593 > URL: https://issues.apache.org/jira/browse/FLINK-11593 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.8.0 > > > Check and port {{TaskManagerTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11593) Check & port TaskManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769112#comment-16769112 ] Shimin Yang commented on FLINK-11593: - Hi [~till.rohrmann], thanks for you remind. I am starting working on this. > Check & port TaskManagerTest to new code base > - > > Key: FLINK-11593 > URL: https://issues.apache.org/jira/browse/FLINK-11593 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.8.0 > > > Check and port {{TaskManagerTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11593) Check & port TaskManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-11593: --- Assignee: Shimin Yang > Check & port TaskManagerTest to new code base > - > > Key: FLINK-11593 > URL: https://issues.apache.org/jira/browse/FLINK-11593 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.8.0 > > > Check and port {{TaskManagerTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11364) Check and port TaskManagerFailsITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-11364: --- Assignee: Shimin Yang > Check and port TaskManagerFailsITCase to new code base if necessary > --- > > Key: FLINK-11364 > URL: https://issues.apache.org/jira/browse/FLINK-11364 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > > Check and port {{TaskManagerFailsITCase}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11486) Remove RecoveryITCase for jobmanager
Shimin Yang created FLINK-11486: --- Summary: Remove RecoveryITCase for jobmanager Key: FLINK-11486 URL: https://issues.apache.org/jira/browse/FLINK-11486 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Shimin Yang Assignee: Shimin Yang RecoveryITCase under org.apache.flink.runtime.jobmanager package is based on legacy mode and should be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10245) Add HBase Streaming Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10245: Summary: Add HBase Streaming Sink (was: Add DataStream HBase Sink) > Add HBase Streaming Sink > > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11084) Incorrect ouput after two consecutive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720304#comment-16720304 ] Shimin Yang commented on FLINK-11084: - Hi [~dawidwys], thank you for providing the case! I agree with your idea. The SplitTransformation should be marked deprecated as well. Throwing a hard exception is more feasible. > Incorrect ouput after two consecutive split and select > -- > > Key: FLINK-11084 > URL: https://issues.apache.org/jira/browse/FLINK-11084 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.2 > > > The second OutputSelector of two successive split and select are actually not > rely on the first one. They are in the same array of OutputSelector in > DirectedOutput. > For example. > outputSelector1 => \{“name1” or ”name2“} > outputSelector2 => \{”name3“ or “name4”} > resultStream = > dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") > expectedResult for input \{StreamRecord1}: > outputSelector1 => \{”name1“} > outputSelector2 => \{”name3“} > resultStream => {} > actualResult: > resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11084) Incorrect ouput after two consecutive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712255#comment-16712255 ] Shimin Yang commented on FLINK-11084: - Hi [~aljoscha], I don't agree on that. From the view of application developer, it may leads to introduce dirty data and affects the quality of data. User might use this API in more complicated logic other than directly use two split consecutively. For example: SplitStream splitLog = dataStream.split(...) DataStream logStream = splitLog.select(...) DataStream errorStream = splitLog.select(...) errorStream.map(...).filter(...).addSink(...) SplitStream splitLogStream = logStream.split(...) DataStream stream1 = splitLogStream.select.map.(...).addSink DataStream stream2 = splitLogStream.select.map.(...).addSink This will produce wrong data and very hard to debug. I think as long as still providing this API, we should make it right. I have an implementation on this and if that doesn't works out, we should at least throw an exception to indicate user two consecutive split and select is not supported. > Incorrect ouput after two consecutive split and select > -- > > Key: FLINK-11084 > URL: https://issues.apache.org/jira/browse/FLINK-11084 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The second OutputSelector of two successive split and select are actually not > rely on the first one. They are in the same array of OutputSelector in > DirectedOutput. > For example. > outputSelector1 => \{“name1” or ”name2“} > outputSelector2 => \{”name3“ or “name4”} > resultStream = > dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") > expectedResult for input \{StreamRecord1}: > outputSelector1 => \{”name1“} > outputSelector2 => \{”name3“} > resultStream => {} > actualResult: > resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11084) Incorrect ouput after two consecutive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-11084: Summary: Incorrect ouput after two consecutive split and select (was: Incorrect ouput after two successive split and select) > Incorrect ouput after two consecutive split and select > -- > > Key: FLINK-11084 > URL: https://issues.apache.org/jira/browse/FLINK-11084 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The second OutputSelector of two successive split and select are actually not > rely on the first one. They are in the same array of OutputSelector in > DirectedOutput. > For example. > outputSelector1 => \{“name1” or ”name2“} > outputSelector2 => \{”name3“ or “name4”} > resultStream = > dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") > expectedResult for input \{StreamRecord1}: > outputSelector1 => \{”name1“} > outputSelector2 => \{”name3“} > resultStream => {} > actualResult: > resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11084) Incorrect ouput after two successive split and select
[ https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711041#comment-16711041 ] Shimin Yang commented on FLINK-11084: - Currently I am creating a OutputSelectorWrapper to hold the current OutputSelector and all successive parent OutputSelectors and selectNames during the StreamGraph generation. And in the runtime every record will go through all the OutputSelectors in the right order. > Incorrect ouput after two successive split and select > - > > Key: FLINK-11084 > URL: https://issues.apache.org/jira/browse/FLINK-11084 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > The second OutputSelector of two successive split and select are actually not > rely on the first one. They are in the same array of OutputSelector in > DirectedOutput. > For example. > outputSelector1 => \{“name1” or ”name2“} > outputSelector2 => \{”name3“ or “name4”} > resultStream = > dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") > expectedResult for input \{StreamRecord1}: > outputSelector1 => \{”name1“} > outputSelector2 => \{”name3“} > resultStream => {} > actualResult: > resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11084) Incorrect ouput after two successive split and select
Shimin Yang created FLINK-11084: --- Summary: Incorrect ouput after two successive split and select Key: FLINK-11084 URL: https://issues.apache.org/jira/browse/FLINK-11084 Project: Flink Issue Type: Bug Components: Streaming Reporter: Shimin Yang Assignee: Shimin Yang Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 The second OutputSelector of two successive split and select are actually not rely on the first one. They are in the same array of OutputSelector in DirectedOutput. For example. outputSelector1 => \{“name1” or ”name2“} outputSelector2 => \{”name3“ or “name4”} resultStream = dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3") expectedResult for input \{StreamRecord1}: outputSelector1 => \{”name1“} outputSelector2 => \{”name3“} resultStream => {} actualResult: resultStream => \{StreamRecord1} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11002) Trigger.clear() will not be called after purged window
[ https://issues.apache.org/jira/browse/FLINK-11002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698810#comment-16698810 ] Shimin Yang commented on FLINK-11002: - The Trigger#clear will be finally be called in onElement and onProcessingTime via clearAllState method. > Trigger.clear() will not be called after purged window > -- > > Key: FLINK-11002 > URL: https://issues.apache.org/jira/browse/FLINK-11002 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.6.2 >Reporter: Yuan Yifan >Priority: Major > > The `clear` method > (org.apache.flink.streaming.api.windowing.triggers.Trigger#clear) will not be > called while return a `TriggerResult.FIRE_AND_PURGE` in > onElement/onProcessingTime/onEventTime, this may cause the trigger can't > purge after trggered the condition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10540: Affects Version/s: (was: 1.8.0) 1.7.0 > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.8.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10540: Affects Version/s: (was: 1.7.0) 1.8.0 > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.8.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690520#comment-16690520 ] Shimin Yang commented on FLINK-6756: Well, I can take this over. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10815) Rethink the rescale operation, can we do it async
[ https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679185#comment-16679185 ] Shimin Yang commented on FLINK-10815: - Thanks [~dawidwys]. I will close this issue. > Rethink the rescale operation, can we do it async > - > > Key: FLINK-10815 > URL: https://issues.apache.org/jira/browse/FLINK-10815 > Project: Flink > Issue Type: Improvement > Components: ResourceManager, Scheduler >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Currently, the rescale operation is to stop the whole job and restart it with > different parrellism. But the rescale operation cost a lot and took lots of > time to recover if the state size is quite big. > And a long-time rescale might cause other problems like latency increase and > back pressure. For some circumstances like a streaming computing cloud > service, users may be very sensitive to latency and resource usage. So it > would be better to make the rescale a cheaper operation. > I wonder if we could make it an async operation just like checkpoint. But how > to deal with the keyed state would be a pain in the ass. Currently I just > want to make some assumption to make things simpler. The asnyc rescale > operation can only double the parrellism or make it half. > In the scale up circumstance, we can copy the state to the newly created > worker and change the partitioner of the upstream. The best timing might be > get notified of checkpoint completed. But we still need to change the > partitioner of upstream. So the upstream should buffer the result or block > the computation util the state copy finished. Then make the partitioner to > send differnt elements with the same key to the same downstream operator. > In the scale down circumstance, we can merge the keyed state of two operators > and also change the partitioner of upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10815) Rethink the rescale operation, can we do it async
[ https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang closed FLINK-10815. --- Resolution: Workaround > Rethink the rescale operation, can we do it async > - > > Key: FLINK-10815 > URL: https://issues.apache.org/jira/browse/FLINK-10815 > Project: Flink > Issue Type: Improvement > Components: ResourceManager, Scheduler >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Currently, the rescale operation is to stop the whole job and restart it with > different parrellism. But the rescale operation cost a lot and took lots of > time to recover if the state size is quite big. > And a long-time rescale might cause other problems like latency increase and > back pressure. For some circumstances like a streaming computing cloud > service, users may be very sensitive to latency and resource usage. So it > would be better to make the rescale a cheaper operation. > I wonder if we could make it an async operation just like checkpoint. But how > to deal with the keyed state would be a pain in the ass. Currently I just > want to make some assumption to make things simpler. The asnyc rescale > operation can only double the parrellism or make it half. > In the scale up circumstance, we can copy the state to the newly created > worker and change the partitioner of the upstream. The best timing might be > get notified of checkpoint completed. But we still need to change the > partitioner of upstream. So the upstream should buffer the result or block > the computation util the state copy finished. Then make the partitioner to > send differnt elements with the same key to the same downstream operator. > In the scale down circumstance, we can merge the keyed state of two operators > and also change the partitioner of upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10815) Rethink the rescale operation, can we do it async
[ https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678456#comment-16678456 ] Shimin Yang edited comment on FLINK-10815 at 11/7/18 4:19 PM: -- [~till.rohrmann], I would like to hear your opinion before diving into more detail. Since I am not very sure whether this is a nice feature and is it worthy to work on this. Currently, it's more like a proposal. was (Author: dangdangdang): [~till.rohrmann], I would like to hear your opinion before diving into more detail. Since I am not very sure whether this is a nice feature and is it worthy to work on this. > Rethink the rescale operation, can we do it async > - > > Key: FLINK-10815 > URL: https://issues.apache.org/jira/browse/FLINK-10815 > Project: Flink > Issue Type: Improvement > Components: ResourceManager, Scheduler >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Currently, the rescale operation is to stop the whole job and restart it with > different parrellism. But the rescale operation cost a lot and took lots of > time to recover if the state size is quite big. > And a long-time rescale might cause other problems like latency increase and > back pressure. For some circumstances like a streaming computing cloud > service, users may be very sensitive to latency and resource usage. So it > would be better to make the rescale a cheaper operation. > I wonder if we could make it an async operation just like checkpoint. But how > to deal with the keyed state would be a pain in the ass. Currently I just > want to make some assumption to make things simpler. The asnyc rescale > operation can only double the parrellism or make it half. > In the scale up circumstance, we can copy the state to the newly created > worker and change the partitioner of the upstream. The best timing might be > get notified of checkpoint completed. But we still need to change the > partitioner of upstream. So the upstream should buffer the result or block > the computation util the state copy finished. Then make the partitioner to > send differnt elements with the same key to the same downstream operator. > In the scale down circumstance, we can merge the keyed state of two operators > and also change the partitioner of upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10815) Rethink the rescale operation, can we do it async
[ https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678456#comment-16678456 ] Shimin Yang commented on FLINK-10815: - [~till.rohrmann], I would like to hear your opinion before diving into more detail. Since I am not very sure whether this is a nice feature and is it worthy to work on this. > Rethink the rescale operation, can we do it async > - > > Key: FLINK-10815 > URL: https://issues.apache.org/jira/browse/FLINK-10815 > Project: Flink > Issue Type: Improvement > Components: ResourceManager, Scheduler >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Currently, the rescale operation is to stop the whole job and restart it with > different parrellism. But the rescale operation cost a lot and took lots of > time to recover if the state size is quite big. > And a long-time rescale might cause other problems like latency increase and > back pressure. For some circumstances like a streaming computing cloud > service, users may be very sensitive to latency and resource usage. So it > would be better to make the rescale a cheaper operation. > I wonder if we could make it an async operation just like checkpoint. But how > to deal with the keyed state would be a pain in the ass. Currently I just > want to make some assumption to make things simpler. The asnyc rescale > operation can only double the parrellism or make it half. > In the scale up circumstance, we can copy the state to the newly created > worker and change the partitioner of the upstream. The best timing might be > get notified of checkpoint completed. But we still need to change the > partitioner of upstream. So the upstream should buffer the result or block > the computation util the state copy finished. Then make the partitioner to > send differnt elements with the same key to the same downstream operator. > In the scale down circumstance, we can merge the keyed state of two operators > and also change the partitioner of upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10815) Rethink the rescale operation, can we do it async
Shimin Yang created FLINK-10815: --- Summary: Rethink the rescale operation, can we do it async Key: FLINK-10815 URL: https://issues.apache.org/jira/browse/FLINK-10815 Project: Flink Issue Type: Improvement Components: ResourceManager, Scheduler Reporter: Shimin Yang Assignee: Shimin Yang Currently, the rescale operation is to stop the whole job and restart it with different parrellism. But the rescale operation cost a lot and took lots of time to recover if the state size is quite big. And a long-time rescale might cause other problems like latency increase and back pressure. For some circumstances like a streaming computing cloud service, users may be very sensitive to latency and resource usage. So it would be better to make the rescale a cheaper operation. I wonder if we could make it an async operation just like checkpoint. But how to deal with the keyed state would be a pain in the ass. Currently I just want to make some assumption to make things simpler. The asnyc rescale operation can only double the parrellism or make it half. In the scale up circumstance, we can copy the state to the newly created worker and change the partitioner of the upstream. The best timing might be get notified of checkpoint completed. But we still need to change the partitioner of upstream. So the upstream should buffer the result or block the computation util the state copy finished. Then make the partitioner to send differnt elements with the same key to the same downstream operator. In the scale down circumstance, we can merge the keyed state of two operators and also change the partitioner of upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10703) Race condition in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-10703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674615#comment-16674615 ] Shimin Yang commented on FLINK-10703: - Thanks [~till.rohrmann]. I didn't notice the runAsync was actually executed by main thread. I will close this issue then. > Race condition in YarnResourceManager > - > > Key: FLINK-10703 > URL: https://issues.apache.org/jira/browse/FLINK-10703 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Race condition on numPendingContainerRequests, this instance variable should > be atomic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10703) Race condition in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-10703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang closed FLINK-10703. --- Resolution: Not A Problem > Race condition in YarnResourceManager > - > > Key: FLINK-10703 > URL: https://issues.apache.org/jira/browse/FLINK-10703 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Race condition on numPendingContainerRequests, this instance variable should > be atomic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10703) Race condition in YarnResourceManager
Shimin Yang created FLINK-10703: --- Summary: Race condition in YarnResourceManager Key: FLINK-10703 URL: https://issues.apache.org/jira/browse/FLINK-10703 Project: Flink Issue Type: Bug Components: YARN Reporter: Shimin Yang Assignee: Shimin Yang Race condition on numPendingContainerRequests, this instance variable should be atomic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10503) Periodically check for new resources
[ https://issues.apache.org/jira/browse/FLINK-10503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10503: --- Assignee: Shimin Yang > Periodically check for new resources > > > Key: FLINK-10503 > URL: https://issues.apache.org/jira/browse/FLINK-10503 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > In order to decide when to start scheduling or to rescale, we need to > periodically check for new resources (slots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10501) Obtain resource overview of cluster
[ https://issues.apache.org/jira/browse/FLINK-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10501: --- Assignee: Shimin Yang > Obtain resource overview of cluster > --- > > Key: FLINK-10501 > URL: https://issues.apache.org/jira/browse/FLINK-10501 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > In order to decide with which parallelism to run, the > {{ExecutionGraphDriver}} needs to obtain an overview over all available > resources. This includes the resources managed by the {{SlotPool}} as well as > not yet allocated resources on the {{ResourceManager}}. This is a temporary > workaround until we adapted the slot allocation protocol to support resource > declaration. Once this is done, we will only take the {{SlotPool’s}} slots > into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10500) Let ExecutionGraphDriver react to fail signal
[ https://issues.apache.org/jira/browse/FLINK-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10500: --- Assignee: Shimin Yang > Let ExecutionGraphDriver react to fail signal > - > > Key: FLINK-10500 > URL: https://issues.apache.org/jira/browse/FLINK-10500 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > In order to scale down when there are not enough resources available or if > TMs died, the {{ExecutionGraphDriver}} needs to learn about a failure. > Depending on the failure type and the available set of resources, it can then > decide to scale the job down or simply restart. In the scope of this issue, > the {{ExecutionGraphDriver}} should simply call into the {{RestartStrategy}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10618) Introduce catalog for Flink tables
[ https://issues.apache.org/jira/browse/FLINK-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661688#comment-16661688 ] Shimin Yang commented on FLINK-10618: - Sounds good. Do you have a draft of design doc? > Introduce catalog for Flink tables > -- > > Key: FLINK-10618 > URL: https://issues.apache.org/jira/browse/FLINK-10618 > Project: Flink > Issue Type: New Feature > Components: SQL Client >Affects Versions: 1.6.1 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > Besides meta objects such as tables that may come from an > {{ExternalCatalog}}, Flink also deals with tables/views/functions that are > created on the fly (in memory), or specified in a configuration file. Those > objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them > in memory, which are non-persistent, or recreates them from a file, which is > a big pain for the user. Those objects are only known to Flink but Flink has > a poor management for them. > Since they are typical objects in a database catalog, it's natural to have a > catalog that manages those objects. The interface will be similar to > {{ExternalCatalog}}, which contains meta objects that are not managed by > Flink. There are several possible implementations of the Flink internal > catalog interface: memory, file, external registry (such as confluent schema > registry or Hive metastore), and relational database, etc. > The initial functionality as well as the catalog hierarchy could be very > simple. The basic functionality of the catalog will be mostly create, alter, > and drop tables, views, functions, etc. Obviously, this can evolve over the > time. > We plan to provide implementations with memory, file, and Hive metastore, and > will be plugged in at SQL-Client layer. > Please provide your feedback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656235#comment-16656235 ] Shimin Yang edited comment on FLINK-10540 at 10/19/18 3:37 AM: --- Look like it's only used by flink-storm example in production code. Better to start after storm is removed [~Tison]. was (Author: dangdangdang): Look like it's only used by flink-storm example. Better to start after storm is removed [~Tison]. > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656235#comment-16656235 ] Shimin Yang commented on FLINK-10540: - Look like it's only used by flink-storm example. Better to start after storm is removed [~Tison]. > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10540: --- Assignee: Shimin Yang > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651276#comment-16651276 ] Shimin Yang commented on FLINK-10540: - Hi [~Tison], I would love to take this if you haven't worked on it. > Remove legacy FlinkMiniCluster > -- > > Key: FLINK-10540 > URL: https://issues.apache.org/jira/browse/FLINK-10540 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.7.0 >Reporter: TisonKun >Priority: Major > Fix For: 1.7.0 > > > {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer > used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10552) Provide RichAsyncFunction for scala API
[ https://issues.apache.org/jira/browse/FLINK-10552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10552: Issue Type: Improvement (was: Bug) > Provide RichAsyncFunction for scala API > --- > > Key: FLINK-10552 > URL: https://issues.apache.org/jira/browse/FLINK-10552 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > > Currently, only Java API provide a RichAsyncFunction abstract class while > scala dose not. Thought it would be nice to provide the same function for > scala api. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils
[ https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10511: Issue Type: Improvement (was: Bug) > Code duplication of creating RPC service in ClusterEntrypoint and > AkkaRpcServiceUtils > - > > Key: FLINK-10511 > URL: https://issues.apache.org/jira/browse/FLINK-10511 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Minor > > The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but > the ClusterEntrypoint use a protected method to do the same job. I think it's > better to use the same method in AkkaRpcServiceUtils for reuse of code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10533) job parallelism equals task slots number but not use the same number of the task slots as the parallelism
[ https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649879#comment-16649879 ] Shimin Yang commented on FLINK-10533: - Flink uses Slot Sharing Group to optimize the Resource Usage and allow subtasks in same job to share the slot. It's based on task but not job graph. If you want to use four slots you can set parallelism to 4. Please refer to [https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources] for more info. If you think this might solve your confusion, I will close this issue. > job parallelism equals task slots number but not use the same number of the > task slots as the parallelism > - > > Key: FLINK-10533 > URL: https://issues.apache.org/jira/browse/FLINK-10533 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.3, 1.6.0, 1.6.1 >Reporter: sean.miao >Priority: Major > Attachments: image-2018-10-12-10-35-57-443.png, > image-2018-10-12-10-36-13-503.png > > > i use the table api and do not use the datastream api。 > > my job has two graph and every parallelism is two.so the total parallelism is > four; > but if give my job four slots but it just use two slots. > > !image-2018-10-12-10-36-13-503.png! > !image-2018-10-12-10-35-57-443.png! > > thanks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10552) Provide RichAsyncFunction for scala API
Shimin Yang created FLINK-10552: --- Summary: Provide RichAsyncFunction for scala API Key: FLINK-10552 URL: https://issues.apache.org/jira/browse/FLINK-10552 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Shimin Yang Assignee: Shimin Yang Currently, only Java API provide a RichAsyncFunction abstract class while scala dose not. Thought it would be nice to provide the same function for scala api. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils
[ https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647692#comment-16647692 ] Shimin Yang commented on FLINK-10511: - [~till.rohrmann], good point. We can add an override function createRpcService takes port range as parameter and use the BootstrapTools to start an actor system in AkkaRpcServiceUtils. > Code duplication of creating RPC service in ClusterEntrypoint and > AkkaRpcServiceUtils > - > > Key: FLINK-10511 > URL: https://issues.apache.org/jira/browse/FLINK-10511 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Minor > > The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but > the ClusterEntrypoint use a protected method to do the same job. I think it's > better to use the same method in AkkaRpcServiceUtils for reuse of code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10166: --- Assignee: (was: Shimin Yang) > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > Fix For: 1.7.0 > > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10500) Let ExecutionGraphDriver react to fail signal
[ https://issues.apache.org/jira/browse/FLINK-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643038#comment-16643038 ] Shimin Yang commented on FLINK-10500: - Hi [~till.rohrmann], have you started working on this? I would love to take this after the decoupling of ExecutionGraph in JobMaster. > Let ExecutionGraphDriver react to fail signal > - > > Key: FLINK-10500 > URL: https://issues.apache.org/jira/browse/FLINK-10500 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > In order to scale down when there are not enough resources available or if > TMs died, the {{ExecutionGraphDriver}} needs to learn about a failure. > Depending on the failure type and the available set of resources, it can then > decide to scale the job down or simply restart. In the scope of this issue, > the {{ExecutionGraphDriver}} should simply call into the {{RestartStrategy}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils
Shimin Yang created FLINK-10511: --- Summary: Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils Key: FLINK-10511 URL: https://issues.apache.org/jira/browse/FLINK-10511 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: Shimin Yang Assignee: Shimin Yang The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but the ClusterEntrypoint use a protected method to do the same job. I think it's better to use the same method in AkkaRpcServiceUtils for reuse of code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621468#comment-16621468 ] Shimin Yang commented on FLINK-10245: - Hi [~hequn8128], I have updated the document, we can discuss how to implement the merge in there. > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
[ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620386#comment-16620386 ] Shimin Yang commented on FLINK-10333: - Hi [~till.rohrmann], is there a discussion thread for this issue? > Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, > CompletedCheckpoints) > - > > Key: FLINK-10333 > URL: https://issues.apache.org/jira/browse/FLINK-10333 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > While going over the ZooKeeper based stores > ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, > {{ZooKeeperCompletedCheckpointStore}}) and the underlying > {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were > introduced with past incremental changes. > * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} > or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization > problems will either lead to removing the Znode or not > * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of > exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case > of a failure) > * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be > better to move {{RetrievableStateStorageHelper}} out of it for a better > separation of concerns > * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even > if it is locked. This should not happen since it could leave another system > in an inconsistent state (imagine a changed {{JobGraph}} which restores from > an old checkpoint) > * Redundant but also somewhat inconsistent put logic in the different stores > * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} > which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}} > * Getting rid of the {{SubmittedJobGraphListener}} would be helpful > These problems made me think how reliable these components actually work. > Since these components are very important, I propose to refactor them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10000) There's a mistake in the comments of CoGroupedStreams.java
[ https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang closed FLINK-1. --- Resolution: Won't Fix > There's a mistake in the comments of CoGroupedStreams.java > -- > > Key: FLINK-1 > URL: https://issues.apache.org/jira/browse/FLINK-1 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Minor > Labels: pull-request-available > > The example of {{CoGroupedStream}} > {code} > DataStream result = one.coGroup(two) > .where(new MyFirstKeySelector()) > .equalTo(new MyFirstKeySelector()) > .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) > .apply(new MyCoGroupFunction()); > {code} > the params of method {{equalTo}} should be {{MySecondKeySelector}} instead of > {{MyFirstKeySelector}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10337) Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620357#comment-16620357 ] Shimin Yang commented on FLINK-10337: - [~till.rohrmann], sounds reasonable. I will follow the parent issue. > Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore > > > Key: FLINK-10337 > URL: https://issues.apache.org/jira/browse/FLINK-10337 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > Use {{ZooKeeperStateStore}} in {{ZooKeeperCompletedCheckpointStore}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10344) Rethink SubmittedJobGraphListener
[ https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620356#comment-16620356 ] Shimin Yang commented on FLINK-10344: - Hi [~SleePy], is there any relation between this issue with your attachment? > Rethink SubmittedJobGraphListener > - > > Key: FLINK-10344 > URL: https://issues.apache.org/jira/browse/FLINK-10344 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > Attachments: Tornado.pdf, Tornado.pdf > > > The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can > return false positives. This is obviously problematic, because it causes the > subsequent recovery operation to fail. Ideally we would not require the > {{SubmittedJobGraphListener}}. One could, for example, periodically check > from the main thread whether there are new jobs. That way we would know which > jobs are currently running and which are being cleaned up. > Alternatively it is necessary to tolerate false positives :-( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10206) Add hbase sink connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620069#comment-16620069 ] Shimin Yang commented on FLINK-10206: - [~hequn8128] [~twalthr] [~till.rohrmann] Hi all, I have updated the design document. I had some proposal on how to implement the buffer and think I still need some advice. Could you review it when you are free? > Add hbase sink connector > > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase source connector for batch operation. > > In some cases, we need to save Streaming/Batch results into hbase. Just like > cassandra streaming/Batch sink implementations. > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10206) Add hbase sink connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10206: Description: Now, there is a hbase source connector for batch operation. In some cases, we need to save Streaming/Batch results into hbase. Just like cassandra streaming/Batch sink implementations. Design documentation: [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] was: Now, there is a hbase source connector for batch operation. In some cases, we need to save Streaming/Batch results into hbase. Just like cassandra streaming/Batch sink implementations. > Add hbase sink connector > > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase source connector for batch operation. > > In some cases, we need to save Streaming/Batch results into hbase. Just like > cassandra streaming/Batch sink implementations. > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618688#comment-16618688 ] Shimin Yang commented on FLINK-10245: - [~hequn8128] Sounds good. > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618686#comment-16618686 ] Shimin Yang commented on FLINK-10245: - Hi [~yuanoOo], I think the Increment operation should in the bussiness logic not the Sink. If we retrieve a value from HBase and then increment by one, then it will violate the consistency during recovery. For the pv senario, I suggest you can use the count in flink API and write to HBase in an overwrite fashion. > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10344) Rethink SubmittedJobGraphListener
[ https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10344: --- Assignee: Shimin Yang > Rethink SubmittedJobGraphListener > - > > Key: FLINK-10344 > URL: https://issues.apache.org/jira/browse/FLINK-10344 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can > return false positives. This is obviously problematic, because it causes the > subsequent recovery operation to fail. Ideally we would not require the > {{SubmittedJobGraphListener}}. One could, for example, periodically check > from the main thread whether there are new jobs. That way we would know which > jobs are currently running and which are being cleaned up. > Alternatively it is necessary to tolerate false positives :-( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10337) Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10337: --- Assignee: Shimin Yang > Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore > > > Key: FLINK-10337 > URL: https://issues.apache.org/jira/browse/FLINK-10337 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Major > Fix For: 1.7.0 > > > Use {{ZooKeeperStateStore}} in {{ZooKeeperCompletedCheckpointStore}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613318#comment-16613318 ] Shimin Yang commented on FLINK-10166: - Hi [~twalthr]. Well, not many times. Maybe we could add some hint in the log instead of throw a exception with message "Table program cannot be compiled. This is a bug. Please file an issue.". > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Shimin Yang >Priority: Blocker > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613318#comment-16613318 ] Shimin Yang edited comment on FLINK-10166 at 9/13/18 10:55 AM: --- Hi [~twalthr]. Well, not many times. Maybe we could add some hint in the log instead of just throwing a exception with message "Table program cannot be compiled. This is a bug. Please file an issue.". was (Author: dangdangdang): Hi [~twalthr]. Well, not many times. Maybe we could add some hint in the log instead of throw a exception with message "Table program cannot be compiled. This is a bug. Please file an issue.". > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Shimin Yang >Priority: Blocker > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10166: --- Assignee: Shimin Yang > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Assignee: Shimin Yang >Priority: Blocker > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client
[ https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613136#comment-16613136 ] Shimin Yang commented on FLINK-10166: - Hi [~dawidwys] [~StephanEwen], I think a more elegant way to solve this problem is to not relocate commons-codec. What do you think? > Dependency problems when executing SQL query in sql-client > -- > > Key: FLINK-10166 > URL: https://issues.apache.org/jira/browse/FLINK-10166 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > > When tried to run query: > {code} > select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name) > {code} > in {{sql-client.sh}} I got: > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown > variable or type "org.apache.commons.codec.binary.Base64" > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10330) Code Compilation failed in local mode
[ https://issues.apache.org/jira/browse/FLINK-10330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang closed FLINK-10330. --- Resolution: Duplicate > Code Compilation failed in local mode > - > > Key: FLINK-10330 > URL: https://issues.apache.org/jira/browse/FLINK-10330 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.5.0, 1.6.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1, 1.5.4 > > Attachments: error log > > > Janino compiler in flink-table can not find type Base64 in commons-codec. > [^error log] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10330) Code Compilation failed in local mode
Shimin Yang created FLINK-10330: --- Summary: Code Compilation failed in local mode Key: FLINK-10330 URL: https://issues.apache.org/jira/browse/FLINK-10330 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0, 1.5.0 Reporter: Shimin Yang Assignee: Shimin Yang Fix For: 1.6.1, 1.5.4 Attachments: error log Janino compiler in flink-table can not find type Base64 in commons-codec. [^error log] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10327: --- Assignee: Piotr Nowojski (was: Shimin Yang) > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612278#comment-16612278 ] Shimin Yang commented on FLINK-10327: - Sounds good, I ll work on it. > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Shimin Yang >Priority: Major > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10327: --- Assignee: Shimin Yang > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Shimin Yang >Priority: Major > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612017#comment-16612017 ] Shimin Yang commented on FLINK-10322: - Cool. > Unused instance variable MetricRegistry in ResourceManager > -- > > Key: FLINK-10322 > URL: https://issues.apache.org/jira/browse/FLINK-10322 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Minor > > Same as the title. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611714#comment-16611714 ] Shimin Yang commented on FLINK-10322: - Hi [~Tison], do you have any suggestion about what to add in the mertrics of RM? > Unused instance variable MetricRegistry in ResourceManager > -- > > Key: FLINK-10322 > URL: https://issues.apache.org/jira/browse/FLINK-10322 > Project: Flink > Issue Type: Improvement > Components: ResourceManager >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Minor > > Same as the title. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager
Shimin Yang created FLINK-10322: --- Summary: Unused instance variable MetricRegistry in ResourceManager Key: FLINK-10322 URL: https://issues.apache.org/jira/browse/FLINK-10322 Project: Flink Issue Type: Improvement Components: ResourceManager Reporter: Shimin Yang Assignee: Shimin Yang Same as the title. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611491#comment-16611491 ] Shimin Yang commented on FLINK-10245: - [~hequn8128], we may make the deduplication as an optimization in something like append mode or update mode without nullable field. > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611487#comment-16611487 ] Shimin Yang commented on FLINK-10245: - Hi [~hequn8128],I have been thinking about should we allow Nullable field. And if the Nullable field is allowed, then the mutations cannot be deduplicated by rowkey. Meanwhile, I also the think the deduplication is not suitable for Retract or Upsert Table SInk that I will implement in the future. But keep a memory buffer and use batch api might be a good idea. I will try to work on it. > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16609013#comment-16609013 ] Shimin Yang commented on FLINK-10247: - [~till.rohrmann] It could be a good way to decouple MetricsService from other components, but I am not sure if there any method to configure the priority of these two actor systems. > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608642#comment-16608642 ] Shimin Yang commented on FLINK-10245: - Hi [~hequn8128], For the comments you mentioned last time, I looked into the HBase client implementation and think that I can add a scheduler to flush the data periodically by the time set by user. I am not very sure about should I replace the api with Hbase batch api since it already provided buffer and flush functionality. And if I stick with this api, I think it's hard to deduplicate data using rowkey as it is buffered in the BufferedMutator in HBase client and there's no deletion of Mutator function provided. What do you think? Best Shimin > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607989#comment-16607989 ] Shimin Yang commented on FLINK-10245: - Hi [~hequn8128], Thanks for your remind and I have opened the comment access. For the buffer part, I will take a loot at it and see what I can do. Best, Shimin > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607989#comment-16607989 ] Shimin Yang edited comment on FLINK-10245 at 9/8/18 9:50 AM: - Hi [~hequn8128], Thanks for your remind and I have opened the comment access. For the buffer part, I will take a look at it and see what I can do. Best, Shimin was (Author: dangdangdang): Hi [~hequn8128], Thanks for your remind and I have opened the comment access. For the buffer part, I will take a loot at it and see what I can do. Best, Shimin > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607114#comment-16607114 ] Shimin Yang commented on FLINK-10247: - Hi [~Zentol], sounds good. I am gonna working on it. > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10247: --- Assignee: Shimin Yang > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reopened FLINK-9567: This issue also occurs while using region strategy. In that case the pending slot should also be checked during start new worker and on container allocated before request a new Yarn container. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.1, 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool
[ https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605338#comment-16605338 ] Shimin Yang commented on FLINK-10247: - Hi [~till.rohrmann], do you mean that we should move MetricQueryService out of the Akka Actor System and use a dedicated thread pool to receive the messages and deal with them? Cheers > Run MetricQueryService in separate thread pool > -- > > Key: FLINK-10247 > URL: https://issues.apache.org/jira/browse/FLINK-10247 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > In order to make the {{MetricQueryService}} run independently of the main > Flink components, it should get its own dedicated thread pool assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10206) Add hbase sink connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602287#comment-16602287 ] Shimin Yang commented on FLINK-10206: - [~hequn8128] Thanks for your advice, I have attached a link to desiign document in Flink-10245. > Add hbase sink connector > > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase source connector for batch operation. > > In some cases, we need to save Streaming/Batch results into hbase. Just like > cassandra streaming/Batch sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10245) Add DataStream HBase Sink
[ https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang updated FLINK-10245: Description: Design documentation: [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] > Add DataStream HBase Sink > - > > Key: FLINK-10245 > URL: https://issues.apache.org/jira/browse/FLINK-10245 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Design documentation: > [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10277) Add AppendStreamTableSink and UpsertStreamTableSink
Shimin Yang created FLINK-10277: --- Summary: Add AppendStreamTableSink and UpsertStreamTableSink Key: FLINK-10277 URL: https://issues.apache.org/jira/browse/FLINK-10277 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Shimin Yang Assignee: Shimin Yang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10206) Add hbase sink connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598754#comment-16598754 ] Shimin Yang commented on FLINK-10206: - Hi [~hequn8128] , I am planning to implement the table sink for append and retract sink, but I think I should finished the datastream sink first since the table sink relied on the datastream sink. BTW, should I just add a link to desin document in the PR or propose a FLIP? > Add hbase sink connector > > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase source connector for batch operation. > > In some cases, we need to save Streaming/Batch results into hbase. Just like > cassandra streaming/Batch sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10206) Add hbase sink connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598750#comment-16598750 ] Shimin Yang commented on FLINK-10206: - Thank you for your advices. I will briefly talk about the consistency and performance problem here and work on the design document asap. There's a option in the Table Bulilder named enable buffer, this will buffer the operations and flush them into hbase if the buffer is full. During the snapshot, the hbase sink will flush all the buffer operations in case of failure. In general, it can provide at least once guarantee. > Add hbase sink connector > > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase source connector for batch operation. > > In some cases, we need to save Streaming/Batch results into hbase. Just like > cassandra streaming/Batch sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10248) Add HBase Table Sink
Shimin Yang created FLINK-10248: --- Summary: Add HBase Table Sink Key: FLINK-10248 URL: https://issues.apache.org/jira/browse/FLINK-10248 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Shimin Yang Assignee: Shimin Yang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10245) Add DataStream HBase Sink
Shimin Yang created FLINK-10245: --- Summary: Add DataStream HBase Sink Key: FLINK-10245 URL: https://issues.apache.org/jira/browse/FLINK-10245 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Shimin Yang Assignee: Shimin Yang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang closed FLINK-9567. -- > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.1, 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592501#comment-16592501 ] Shimin Yang commented on FLINK-10206: - Our streaming job heavily relied on hbase as a sink. I am willing to do the streaming part of hbase sink. Can we split into two subtasks? One for batch and one for streaming. > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase connector for batch operation. > > In some cases, we need to save Streaming result into hbase. Just like > cassandra streaming sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shimin Yang reassigned FLINK-10206: --- Assignee: Shimin Yang > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase connector for batch operation. > > In some cases, we need to save Streaming result into hbase. Just like > cassandra streaming sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9054) IllegalStateException: Buffer pool is destroyed
[ https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582054#comment-16582054 ] Shimin Yang commented on FLINK-9054: Could you please attach more logs? Feels like it's not enough to locate the bug. > IllegalStateException: Buffer pool is destroyed > --- > > Key: FLINK-9054 > URL: https://issues.apache.org/jira/browse/FLINK-9054 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Configuration, Core >Affects Versions: 1.4.2 >Reporter: dhiraj prajapati >Priority: Blocker > Attachments: flink-conf.yaml > > > Hi, > I have a flink cluster running on 2 machines, say A and B. > Job manager is running on A. There are 2 TaksManagers, one on each node. > So effectively, A has a job manager and a task manager, while B has a task > manager. > When I submit a job to the cluster, I see below exception and the job fails: > 2018-03-22 17:16:52,205 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while > emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 10 more > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486) > ... 14 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138) > ... 19 more > > The exception does not come when I run only one JobManager (only on machine > B). >
[jira] [Created] (FLINK-10000) There's a mistake in the comments of CoGroupedStreams.java
Shimin Yang created FLINK-1: --- Summary: There's a mistake in the comments of CoGroupedStreams.java Key: FLINK-1 URL: https://issues.apache.org/jira/browse/FLINK-1 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Shimin Yang Assignee: Shimin Yang The example of CoGroupedStream * DataStream result = one.coGroup(two) * .where(new MyFirstKeySelector()) * .equalTo(new MyFirstKeySelector()) * .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) * .apply(new MyCoGroupFunction()); * } the params of method equalTo should be MySecondKeySelector instead of MyFirstKeySelector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516972#comment-16516972 ] Shimin Yang commented on FLINK-9567: Thanks for assigning the issue to me, I will do my best to fix it. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.0 > > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515018#comment-16515018 ] Shimin Yang commented on FLINK-9567: I think I can fix the issue but I cannot assign to myself. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Critical > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode
[ https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513390#comment-16513390 ] Shimin Yang commented on FLINK-9567: I modified the onContainerCompleted method and running for test. > Flink does not release resource in Yarn Cluster mode > > > Key: FLINK-9567 > URL: https://issues.apache.org/jira/browse/FLINK-9567 > Project: Flink > Issue Type: Bug > Components: Cluster Management, YARN >Affects Versions: 1.5.0 >Reporter: Shimin Yang >Priority: Critical > Attachments: FlinkYarnProblem, fulllog.txt > > > After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not > release task manager containers in some specific case. In the worst case, I > had a job configured to 5 task managers, but possess more than 100 containers > in the end. Although the task didn't failed, but it affect other jobs in the > Yarn Cluster. > In the first log I posted, the container with id 24 is the reason why Yarn > did not release resources. As the container was killed before restart, but it > has not received the callback of *onContainerComplete* in > *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. > After restart, as we can see in line 347 of FlinkYarnProblem log, > 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - > Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has > failed, address is now gated for [50] ms. Reason: [Disassociated] > Flink lost the connection of container 24 which is on bd-r1hdp69 machine. > When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it > did not has the connection to TaskManager on container 24, so it just ignore > the close of TaskManger. > 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No > open TaskExecutor connection container_1528707394163_29461_02_24. > Ignoring close TaskExecutor connection. > However, bafore calling *closeTaskManagerConnection,* it already called > *requestYarnContainer* which lead to *numPendingContainerRequests variable > in* *YarnResourceManager* increased by 1. > As the excessive container return is determined by the > *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot > return this container although it is not required. Meanwhile, the restart > logic has already allocated enough containers for Task Managers, Flink will > possess the extra container for a long time for nothing. > In the full log, the job ended with 7 containers while only 3 are running > TaskManagers. > ps: Another strange thing I found is that when sometimes request for a yarn > container, it will return much more than requested. Is it a normal scenario > for AMRMAsyncClient? -- This message was sent by Atlassian JIRA (v7.6.3#76005)