[jira] [Commented] (FLINK-9749) Rework Bucketing Sink

2019-09-24 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937374#comment-16937374
 ] 

Shimin Yang commented on FLINK-9749:


Hi [~kkl0u], that would be nice. One more question is about hadoop 2.6 support, 
I think we actually don't need truncate method if we use 
OnCheckpointRollingPolicy.

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be dropped
> We track this design in a series of sub issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-9749) Rework Bucketing Sink

2019-09-21 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217
 ] 

Shimin Yang edited comment on FLINK-9749 at 9/22/19 5:41 AM:
-

[~kkl0u], I understand your concern. However, if we use event-time based bucket 
assigner, we do not to introduce new mechanism. Although the bucket is related 
to event time, it's just a special bucket which can take advantage of timestamp 
and watermark info in the BucketAssigner.Context.

The exactly-once still need to be guaranteed by Flink. But instead of buffering 
all records in keyed state, using event time bucket assigner only need to track 
the status of bucket status with operator state. And we still need to use 
StreamingFileSink to write to buckets(bucketID is get from window) after 
messages processed by window operator.

Finally, I would like to explain why I bring up this discussion. Users want to 
using flink to do streaming ETL from Kafka to Hive based on event time and 
automatically load partition to Hive partitioned table, which can eventually 
replace the batch ETL pipeline. The number of records is huge since it's all 
raw log message, using window would be too much inefficient.

So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism 
quite similar to watermark to decide the timing to load partition to Hive. We 
also support more complex partition like combining date time and key. Now we 
want to refactor it using StreamingFileSink.


was (Author: dangdangdang):
[~kkl0u], I understand your concern. However, if we use event-time based bucket 
assigner, we do not to introduce new mechanism. Although the bucket is related 
to event time, it's just a special bucket which can take advantage of timestamp 
and watermark info in the BucketAssigner.Context.

The exactly-once still need to be guaranteed by Flink. But instead of buffering 
all records in keyed state, using event time bucket assigner only need to track 
the status of bucket status with operator state. And we still need to use 
StreamingFileSink to write to buckets(bucketID is get from window) after 
messages processed by window operator.

Finally, I would like to explain why I bring up this discussion. Users want to 
using flink to do streaming ETL from Kafka to Hive based on event time and 
automatically load to Hive partitioned table, which can eventually replace the 
batch ETL pipeline. The number of records is huge since it's all raw log 
message, using window would be too much inefficient.

So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism 
quite similar to watermark to decide the timing to load partition to Hive. We 
also support more complex partition like combining date time and key. Now we 
want to refactor it using StreamingFileSink.

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be drop

[jira] [Comment Edited] (FLINK-9749) Rework Bucketing Sink

2019-09-21 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217
 ] 

Shimin Yang edited comment on FLINK-9749 at 9/22/19 5:38 AM:
-

[~kkl0u], I understand your concern. However, if we use event-time based bucket 
assigner, we do not to introduce new mechanism. Although the bucket is related 
to event time, it's just a special bucket which can take advantage of timestamp 
and watermark info in the BucketAssigner.Context.

The exactly-once still need to be guaranteed by Flink. But instead of buffering 
all records in keyed state, using event time bucket assigner only need to track 
the status of bucket status with operator state. And we still need to use 
StreamingFileSink to write to buckets(bucketID is get from window) after 
messages processed by window operator.

Finally, I would like to explain why I bring up this discussion. Users want to 
using flink to do streaming ETL from Kafka to Hive based on event time and 
automatically load to Hive partitioned table, which can eventually replace the 
batch ETL pipeline. The number of records is huge since it's all raw log 
message, using window would be too much inefficient.

So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism 
quite similar to watermark to decide the timing to load partition to Hive. We 
also support more complex partition like combining date time and key. Now we 
want to refactor it using StreamingFileSink.


was (Author: dangdangdang):
[~kkl0u], I understand your concern. However, if we use event-time based bucket 
assigner, we do not to introduce new mechanism. Although the bucket is related 
to event time, it's just a special bucket which can take advantage of timestamp 
and watermark info in the BucketAssigner.Context.

The exactly-once still need to be guaranteed by Flink. But instead of buffering 
all records in keyed state, using event time bucket assigner only need to track 
the status of bucket status with operator state. And we still need to use 
StreamingFileSink to write to buckets(bucketID is get from window) after 
messages processed by window operator.

Finally, I would like to explain why I bring up this discussion. Users want to 
using flink to do streaming ETL from Kafka to Hive based on event time and 
automatically load to Hive partitioned table, which can eventually replace the 
batch ETL pipeline. The number of records is huge since it's all raw log 
message, using window would be too much inefficient.

So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism 
quite similar to watermark to decide the timing to load partition to Hive. 

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be dropped
> We track this design in a series of sub issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9749) Rework Bucketing Sink

2019-09-21 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217
 ] 

Shimin Yang commented on FLINK-9749:


[~kkl0u], I understand your concern. However, if we use event-time based bucket 
assigner, we do not to introduce new mechanism. Although the bucket is related 
to event time, it's just a special bucket which can take advantage of timestamp 
and watermark info in the BucketAssigner.Context.

The exactly-once still need to be guaranteed by Flink. But instead of buffering 
all records in keyed state, using event time bucket assigner only need to track 
the status of bucket status with operator state. And we still need to use 
StreamingFileSink to write to buckets(bucketID is get from window) after 
messages processed by window operator.

Finally, I would like to explain why I bring up this discussion. Users want to 
using flink to do streaming ETL from Kafka to Hive based on event time and 
automatically load to Hive partitioned table, which can eventually replace the 
batch ETL pipeline. The number of records is huge since it's all raw log 
message, using window would be too much inefficient.

So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism 
quite similar to watermark to decide the timing to load partition to Hive. 

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be dropped
> We track this design in a series of sub issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9749) Rework Bucketing Sink

2019-09-20 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934266#comment-16934266
 ] 

Shimin Yang commented on FLINK-9749:


Hi [~kkl0u], it does seem like a replicating of window mechanism, but use 
window operator to buffer records according to event time actually would cost 
too much state IO, using a bucket assigner based on event time instead would be 
much cheaper.

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be dropped
> We track this design in a series of sub issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9749) Rework Bucketing Sink

2019-09-20 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934185#comment-16934185
 ] 

Shimin Yang commented on FLINK-9749:


Hi [~kkl0u], is there anyone working on DateTimeBucketAssigner of event time 
right now? 

> Rework Bucketing Sink
> -
>
> Key: FLINK-9749
> URL: https://issues.apache.org/jira/browse/FLINK-9749
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>Priority: Major
>
> The BucketingSink has a series of deficits at the moment.
> Due to the long list of issues, I would suggest to add a new 
> StreamingFileSink with a new and cleaner design
> h3. Encoders, Parquet, ORC
>  - It only efficiently supports row-wise data formats (avro, jso, sequence 
> files.
>  - Efforts to add (columnar) compression for blocks of data is inefficient, 
> because blocks cannot span checkpoints due to persistence-on-checkpoint.
>  - The encoders are part of the \{{flink-connector-filesystem project}}, 
> rather than in orthogonal formats projects. This blows up the dependencies of 
> the \{{flink-connector-filesystem project}} project. As an example, the 
> rolling file sink has dependencies on Hadoop and Avro, which messes up 
> dependency management.
> h3. Use of FileSystems
>  - The BucketingSink works only on Hadoop's FileSystem abstraction not 
> support Flink's own FileSystem abstraction and cannot work with the packaged 
> S3, maprfs, and swift file systems
>  - The sink hence needs Hadoop as a dependency
>  - The sink relies on "trying out" whether truncation works, which requires 
> write access to the users working directory
>  - The sink relies on enumerating and counting files, rather than maintaining 
> its own state, making less efficient
> h3. Correctness and Efficiency on S3
>  - The BucketingSink relies on strong consistency in the file enumeration, 
> hence may work incorrectly on S3.
>  - The BucketingSink relies on persisting streams at intermediate points. 
> This is not working properly on S3, hence there may be data loss on S3.
> h3. .valid-length companion file
>  - The valid length file makes it hard for consumers of the data and should 
> be dropped
> We track this design in a series of sub issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14003) Add access to customized state other than built in state types

2019-09-08 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16925394#comment-16925394
 ] 

Shimin Yang commented on FLINK-14003:
-

[~carp84], sounds good.

> Add access to customized state other than built in state types
> --
>
> Key: FLINK-14003
> URL: https://issues.apache.org/jira/browse/FLINK-14003
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Shimin Yang
>Priority: Minor
>
> Currently the keyed state backend supports states like value, list, map, 
> although they are quite useful, but it may not fits all user cases. 
> User can develop their customized state backend based on different data store 
> as a plugin, but they do not have access to customized state if the type is 
> not one of those built-in types. 
> I propose to add getPartitionedState method to RuntimeContext and 
> KeyedStateStore interface to allow user to access their customized state, and 
> it will not affect current code path.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14003) Add access to customized state other than built in state types

2019-09-08 Thread Shimin Yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16925116#comment-16925116
 ] 

Shimin Yang commented on FLINK-14003:
-

[~carp84], what do you think?

> Add access to customized state other than built in state types
> --
>
> Key: FLINK-14003
> URL: https://issues.apache.org/jira/browse/FLINK-14003
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / State Backends
>Reporter: Shimin Yang
>Priority: Minor
>
> Currently the keyed state backend supports states like value, list, map, 
> although they are quite useful, but it may not fits all user cases. 
> User can develop their customized state backend based on different data store 
> as a plugin, but they do not have access to customized state if the type is 
> not one of those built-in types. 
> I propose to add getPartitionedState method to RuntimeContext and 
> KeyedStateStore interface to allow user to access their customized state, and 
> it will not affect current code path.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14003) Add access to customized state other than built in state types

2019-09-08 Thread Shimin Yang (Jira)
Shimin Yang created FLINK-14003:
---

 Summary: Add access to customized state other than built in state 
types
 Key: FLINK-14003
 URL: https://issues.apache.org/jira/browse/FLINK-14003
 Project: Flink
  Issue Type: Wish
  Components: Runtime / State Backends
Reporter: Shimin Yang


Currently the keyed state backend supports states like value, list, map, 
although they are quite useful, but it may not fits all user cases. 

User can develop their customized state backend based on different data store 
as a plugin, but they do not have access to customized state if the type is not 
one of those built-in types. 

I propose to add getPartitionedState method to RuntimeContext and 
KeyedStateStore interface to allow user to access their customized state, and 
it will not affect current code path.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-11705) remove org.apache.flink.runtime.testingUtils.TestingUtils

2019-02-21 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773851#comment-16773851
 ] 

Shimin Yang commented on FLINK-11705:
-

Hi [~till.rohrmann], yes, we will port the methods and inner class which still 
in use to Java.

> remove org.apache.flink.runtime.testingUtils.TestingUtils
> -
>
> Key: FLINK-11705
> URL: https://issues.apache.org/jira/browse/FLINK-11705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11705) remove org.apache.flink.runtime.testingUtils.TestingUtils

2019-02-20 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-11705:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-10392

> remove org.apache.flink.runtime.testingUtils.TestingUtils
> -
>
> Key: FLINK-11705
> URL: https://issues.apache.org/jira/browse/FLINK-11705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11705) remove org.apache.flink.runtime.testingUtils.TestingUtils

2019-02-20 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-11705:
---

 Summary: remove org.apache.flink.runtime.testingUtils.TestingUtils
 Key: FLINK-11705
 URL: https://issues.apache.org/jira/browse/FLINK-11705
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Shimin Yang
Assignee: Shimin Yang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11593) Check & port TaskManagerTest to new code base

2019-02-20 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773144#comment-16773144
 ] 

Shimin Yang commented on FLINK-11593:
-

Hi [~till.rohrmann], I have created a pull request for this jira. Hope this 
patch could catch up with the upcoming release.

> Check & port TaskManagerTest to new code base
> -
>
> Key: FLINK-11593
> URL: https://issues.apache.org/jira/browse/FLINK-11593
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Check and port {{TaskManagerTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11593) Check & port TaskManagerTest to new code base

2019-02-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16771493#comment-16771493
 ] 

Shimin Yang commented on FLINK-11593:
-

Hi [~till.rohrmann], it will be finished before Friday.

> Check & port TaskManagerTest to new code base
> -
>
> Key: FLINK-11593
> URL: https://issues.apache.org/jira/browse/FLINK-11593
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.8.0
>
>
> Check and port {{TaskManagerTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11593) Check & port TaskManagerTest to new code base

2019-02-15 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769112#comment-16769112
 ] 

Shimin Yang commented on FLINK-11593:
-

Hi [~till.rohrmann], thanks for you remind. I am starting working on this.

> Check & port TaskManagerTest to new code base
> -
>
> Key: FLINK-11593
> URL: https://issues.apache.org/jira/browse/FLINK-11593
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.8.0
>
>
> Check and port {{TaskManagerTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11593) Check & port TaskManagerTest to new code base

2019-02-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-11593:
---

Assignee: Shimin Yang

> Check & port TaskManagerTest to new code base
> -
>
> Key: FLINK-11593
> URL: https://issues.apache.org/jira/browse/FLINK-11593
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.8.0
>
>
> Check and port {{TaskManagerTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11364) Check and port TaskManagerFailsITCase to new code base if necessary

2019-01-31 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-11364:
---

Assignee: Shimin Yang

> Check and port TaskManagerFailsITCase to new code base if necessary
> ---
>
> Key: FLINK-11364
> URL: https://issues.apache.org/jira/browse/FLINK-11364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
>
> Check and port {{TaskManagerFailsITCase}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11486) Remove RecoveryITCase for jobmanager

2019-01-31 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-11486:
---

 Summary: Remove RecoveryITCase for jobmanager
 Key: FLINK-11486
 URL: https://issues.apache.org/jira/browse/FLINK-11486
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Shimin Yang
Assignee: Shimin Yang


RecoveryITCase under org.apache.flink.runtime.jobmanager package is based on 
legacy mode and should be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10245) Add HBase Streaming Sink

2019-01-21 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10245:

Summary: Add HBase Streaming Sink  (was: Add DataStream HBase Sink)

> Add HBase Streaming Sink
> 
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11084) Incorrect ouput after two consecutive split and select

2018-12-13 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720304#comment-16720304
 ] 

Shimin Yang commented on FLINK-11084:
-

Hi [~dawidwys], thank you for providing the case! I agree with your idea. The 
SplitTransformation should be marked deprecated as well. Throwing a hard 
exception is more feasible.

 

> Incorrect ouput after two consecutive split and select
> --
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.2
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11084) Incorrect ouput after two consecutive split and select

2018-12-06 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712255#comment-16712255
 ] 

Shimin Yang commented on FLINK-11084:
-

Hi [~aljoscha], I don't agree on that. From the view of application developer, 
it may leads to introduce dirty data and affects the quality of data. User 
might use this API in more complicated logic other than directly use two split 
consecutively.

For example:

SplitStream splitLog = dataStream.split(...)

DataStream logStream = splitLog.select(...)

DataStream errorStream = splitLog.select(...)

errorStream.map(...).filter(...).addSink(...)

SplitStream splitLogStream = logStream.split(...)

DataStream stream1 = splitLogStream.select.map.(...).addSink

DataStream stream2 = splitLogStream.select.map.(...).addSink

 

This will produce wrong data and very hard to debug. I think as long as still 
providing this API, we should make it right. I have an implementation on this 
and if that doesn't works out, we should at least throw an exception to 
indicate user two consecutive split and select is not supported.

 

> Incorrect ouput after two consecutive split and select
> --
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11084) Incorrect ouput after two consecutive split and select

2018-12-05 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-11084:

Summary: Incorrect ouput after two consecutive split and select  (was: 
Incorrect ouput after two successive split and select)

> Incorrect ouput after two consecutive split and select
> --
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11084) Incorrect ouput after two successive split and select

2018-12-05 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711041#comment-16711041
 ] 

Shimin Yang commented on FLINK-11084:
-

Currently I am creating a OutputSelectorWrapper to hold the current 
OutputSelector and all successive parent OutputSelectors and selectNames during 
the StreamGraph generation. And in the runtime every record will go through all 
the OutputSelectors in the right order.

> Incorrect ouput after two successive split and select
> -
>
> Key: FLINK-11084
> URL: https://issues.apache.org/jira/browse/FLINK-11084
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> The second OutputSelector of two successive split and select are actually not 
> rely on the first one. They are in the same array of OutputSelector in 
> DirectedOutput.
> For example.
> outputSelector1 => \{“name1” or ”name2“}
> outputSelector2 => \{”name3“ or “name4”}
> resultStream = 
> dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")
> expectedResult for input \{StreamRecord1}:
> outputSelector1 => \{”name1“}
> outputSelector2 => \{”name3“}
> resultStream => {}
> actualResult:
> resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11084) Incorrect ouput after two successive split and select

2018-12-05 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-11084:
---

 Summary: Incorrect ouput after two successive split and select
 Key: FLINK-11084
 URL: https://issues.apache.org/jira/browse/FLINK-11084
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Shimin Yang
Assignee: Shimin Yang
 Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1


The second OutputSelector of two successive split and select are actually not 
rely on the first one. They are in the same array of OutputSelector in 
DirectedOutput.

For example.

outputSelector1 => \{“name1” or ”name2“}

outputSelector2 => \{”name3“ or “name4”}

resultStream = 
dataStream.split(outputSelector1).select("name2").split(outputSelector2).select("name3")

expectedResult for input \{StreamRecord1}:

outputSelector1 => \{”name1“}

outputSelector2 => \{”name3“}

resultStream => {}

actualResult:

resultStream => \{StreamRecord1}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11002) Trigger.clear() will not be called after purged window

2018-11-26 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698810#comment-16698810
 ] 

Shimin Yang commented on FLINK-11002:
-

The Trigger#clear will be finally be called in onElement and onProcessingTime 
via clearAllState method.

> Trigger.clear() will not be called after purged window
> --
>
> Key: FLINK-11002
> URL: https://issues.apache.org/jira/browse/FLINK-11002
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2
>Reporter: Yuan Yifan
>Priority: Major
>
> The `clear` method 
> (org.apache.flink.streaming.api.windowing.triggers.Trigger#clear) will not be 
> called while return a `TriggerResult.FIRE_AND_PURGE` in 
> onElement/onProcessingTime/onEventTime, this may cause the trigger can't 
> purge after trggered the condition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-11-23 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10540:

Affects Version/s: (was: 1.8.0)
   1.7.0

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.8.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-11-23 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10540:

Affects Version/s: (was: 1.7.0)
   1.8.0

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.8.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite

2018-11-17 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690520#comment-16690520
 ] 

Shimin Yang commented on FLINK-6756:


Well, I can take this over.

> Provide RichAsyncFunction to Scala API suite
> 
>
> Key: FLINK-6756
> URL: https://issues.apache.org/jira/browse/FLINK-6756
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Andrea Spina
>Assignee: Andrea Spina
>Priority: Major
>
> I can't find any tracking info about the chance to have RichAsyncFunction in 
> the Scala API suite. I think it'd be nice to have this function in order to 
> access open/close methods and the RuntimeContext.
> I was able to retrieve 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593
>  only, so my question is if there are some blocking issues avoiding this 
> feature. [~till.rohrmann]
> If it's possible and nobody already have done it, I can assign the issue to 
> myself in order to implement it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10815) Rethink the rescale operation, can we do it async

2018-11-07 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679185#comment-16679185
 ] 

Shimin Yang commented on FLINK-10815:
-

Thanks [~dawidwys]. I will close this issue.

> Rethink the rescale operation, can we do it async
> -
>
> Key: FLINK-10815
> URL: https://issues.apache.org/jira/browse/FLINK-10815
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager, Scheduler
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Currently, the rescale operation is to stop the whole job and restart it with 
> different parrellism. But the rescale operation cost a lot and took lots of 
> time to recover if the state size is quite big. 
> And a long-time rescale might cause other problems like latency increase and 
> back pressure. For some circumstances like a streaming computing cloud 
> service, users may be very sensitive to latency and resource usage. So it 
> would be better to make the rescale a cheaper operation.
> I wonder if we could make it an async operation just like checkpoint. But how 
> to deal with the keyed state would be a pain in the ass. Currently I just 
> want to make some assumption to make things simpler. The asnyc rescale 
> operation can only double the parrellism or make it half.
> In the scale up circumstance, we can copy the state to the newly created 
> worker and change the partitioner of the upstream. The best timing might be 
> get notified of checkpoint completed. But we still need to change the 
> partitioner of upstream. So the upstream should buffer the result or block 
> the computation util the state copy finished. Then make the partitioner to 
> send differnt elements with the same key to the same downstream operator.
> In the scale down circumstance, we can merge the keyed state of two operators 
> and also change the partitioner of upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10815) Rethink the rescale operation, can we do it async

2018-11-07 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang closed FLINK-10815.
---
Resolution: Workaround

> Rethink the rescale operation, can we do it async
> -
>
> Key: FLINK-10815
> URL: https://issues.apache.org/jira/browse/FLINK-10815
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager, Scheduler
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Currently, the rescale operation is to stop the whole job and restart it with 
> different parrellism. But the rescale operation cost a lot and took lots of 
> time to recover if the state size is quite big. 
> And a long-time rescale might cause other problems like latency increase and 
> back pressure. For some circumstances like a streaming computing cloud 
> service, users may be very sensitive to latency and resource usage. So it 
> would be better to make the rescale a cheaper operation.
> I wonder if we could make it an async operation just like checkpoint. But how 
> to deal with the keyed state would be a pain in the ass. Currently I just 
> want to make some assumption to make things simpler. The asnyc rescale 
> operation can only double the parrellism or make it half.
> In the scale up circumstance, we can copy the state to the newly created 
> worker and change the partitioner of the upstream. The best timing might be 
> get notified of checkpoint completed. But we still need to change the 
> partitioner of upstream. So the upstream should buffer the result or block 
> the computation util the state copy finished. Then make the partitioner to 
> send differnt elements with the same key to the same downstream operator.
> In the scale down circumstance, we can merge the keyed state of two operators 
> and also change the partitioner of upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10815) Rethink the rescale operation, can we do it async

2018-11-07 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678456#comment-16678456
 ] 

Shimin Yang edited comment on FLINK-10815 at 11/7/18 4:19 PM:
--

[~till.rohrmann], I would like to hear your opinion before diving into more 
detail. Since I am not very sure whether this is a nice feature and is it 
worthy to work on this. Currently, it's more like a proposal.


was (Author: dangdangdang):
[~till.rohrmann], I would like to hear your opinion before diving into more 
detail. Since I am not very sure whether this is a nice feature and is it 
worthy to work on this.

> Rethink the rescale operation, can we do it async
> -
>
> Key: FLINK-10815
> URL: https://issues.apache.org/jira/browse/FLINK-10815
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager, Scheduler
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Currently, the rescale operation is to stop the whole job and restart it with 
> different parrellism. But the rescale operation cost a lot and took lots of 
> time to recover if the state size is quite big. 
> And a long-time rescale might cause other problems like latency increase and 
> back pressure. For some circumstances like a streaming computing cloud 
> service, users may be very sensitive to latency and resource usage. So it 
> would be better to make the rescale a cheaper operation.
> I wonder if we could make it an async operation just like checkpoint. But how 
> to deal with the keyed state would be a pain in the ass. Currently I just 
> want to make some assumption to make things simpler. The asnyc rescale 
> operation can only double the parrellism or make it half.
> In the scale up circumstance, we can copy the state to the newly created 
> worker and change the partitioner of the upstream. The best timing might be 
> get notified of checkpoint completed. But we still need to change the 
> partitioner of upstream. So the upstream should buffer the result or block 
> the computation util the state copy finished. Then make the partitioner to 
> send differnt elements with the same key to the same downstream operator.
> In the scale down circumstance, we can merge the keyed state of two operators 
> and also change the partitioner of upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10815) Rethink the rescale operation, can we do it async

2018-11-07 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678456#comment-16678456
 ] 

Shimin Yang commented on FLINK-10815:
-

[~till.rohrmann], I would like to hear your opinion before diving into more 
detail. Since I am not very sure whether this is a nice feature and is it 
worthy to work on this.

> Rethink the rescale operation, can we do it async
> -
>
> Key: FLINK-10815
> URL: https://issues.apache.org/jira/browse/FLINK-10815
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager, Scheduler
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Currently, the rescale operation is to stop the whole job and restart it with 
> different parrellism. But the rescale operation cost a lot and took lots of 
> time to recover if the state size is quite big. 
> And a long-time rescale might cause other problems like latency increase and 
> back pressure. For some circumstances like a streaming computing cloud 
> service, users may be very sensitive to latency and resource usage. So it 
> would be better to make the rescale a cheaper operation.
> I wonder if we could make it an async operation just like checkpoint. But how 
> to deal with the keyed state would be a pain in the ass. Currently I just 
> want to make some assumption to make things simpler. The asnyc rescale 
> operation can only double the parrellism or make it half.
> In the scale up circumstance, we can copy the state to the newly created 
> worker and change the partitioner of the upstream. The best timing might be 
> get notified of checkpoint completed. But we still need to change the 
> partitioner of upstream. So the upstream should buffer the result or block 
> the computation util the state copy finished. Then make the partitioner to 
> send differnt elements with the same key to the same downstream operator.
> In the scale down circumstance, we can merge the keyed state of two operators 
> and also change the partitioner of upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10815) Rethink the rescale operation, can we do it async

2018-11-07 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10815:
---

 Summary: Rethink the rescale operation, can we do it async
 Key: FLINK-10815
 URL: https://issues.apache.org/jira/browse/FLINK-10815
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager, Scheduler
Reporter: Shimin Yang
Assignee: Shimin Yang


Currently, the rescale operation is to stop the whole job and restart it with 
different parrellism. But the rescale operation cost a lot and took lots of 
time to recover if the state size is quite big. 

And a long-time rescale might cause other problems like latency increase and 
back pressure. For some circumstances like a streaming computing cloud service, 
users may be very sensitive to latency and resource usage. So it would be 
better to make the rescale a cheaper operation.

I wonder if we could make it an async operation just like checkpoint. But how 
to deal with the keyed state would be a pain in the ass. Currently I just want 
to make some assumption to make things simpler. The asnyc rescale operation can 
only double the parrellism or make it half.

In the scale up circumstance, we can copy the state to the newly created worker 
and change the partitioner of the upstream. The best timing might be get 
notified of checkpoint completed. But we still need to change the partitioner 
of upstream. So the upstream should buffer the result or block the computation 
util the state copy finished. Then make the partitioner to send differnt 
elements with the same key to the same downstream operator.

In the scale down circumstance, we can merge the keyed state of two operators 
and also change the partitioner of upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10703) Race condition in YarnResourceManager

2018-11-04 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674615#comment-16674615
 ] 

Shimin Yang commented on FLINK-10703:
-

Thanks [~till.rohrmann]. I didn't notice the runAsync was actually executed by 
main thread. I will close this issue then.

> Race condition in YarnResourceManager
> -
>
> Key: FLINK-10703
> URL: https://issues.apache.org/jira/browse/FLINK-10703
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Race condition on numPendingContainerRequests, this instance variable should 
> be atomic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10703) Race condition in YarnResourceManager

2018-11-04 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang closed FLINK-10703.
---
Resolution: Not A Problem

> Race condition in YarnResourceManager
> -
>
> Key: FLINK-10703
> URL: https://issues.apache.org/jira/browse/FLINK-10703
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Race condition on numPendingContainerRequests, this instance variable should 
> be atomic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10703) Race condition in YarnResourceManager

2018-10-28 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10703:
---

 Summary: Race condition in YarnResourceManager
 Key: FLINK-10703
 URL: https://issues.apache.org/jira/browse/FLINK-10703
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: Shimin Yang
Assignee: Shimin Yang


Race condition on numPendingContainerRequests, this instance variable should be 
atomic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10503) Periodically check for new resources

2018-10-24 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10503:
---

Assignee: Shimin Yang

> Periodically check for new resources
> 
>
> Key: FLINK-10503
> URL: https://issues.apache.org/jira/browse/FLINK-10503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to decide when to start scheduling or to rescale, we need to 
> periodically check for new resources (slots).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10501) Obtain resource overview of cluster

2018-10-24 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10501:
---

Assignee: Shimin Yang

> Obtain resource overview of cluster
> ---
>
> Key: FLINK-10501
> URL: https://issues.apache.org/jira/browse/FLINK-10501
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to decide with which parallelism to run, the 
> {{ExecutionGraphDriver}} needs to obtain an overview over all available 
> resources. This includes the resources managed by the {{SlotPool}} as well as 
> not yet allocated resources on the {{ResourceManager}}. This is a temporary 
> workaround until we adapted the slot allocation protocol to support resource 
> declaration. Once this is done, we will only take the {{SlotPool’s}} slots 
> into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10500) Let ExecutionGraphDriver react to fail signal

2018-10-23 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10500:
---

Assignee: Shimin Yang

> Let ExecutionGraphDriver react to fail signal
> -
>
> Key: FLINK-10500
> URL: https://issues.apache.org/jira/browse/FLINK-10500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to scale down when there are not enough resources available or if 
> TMs died, the {{ExecutionGraphDriver}} needs to learn about a failure. 
> Depending on the failure type and the available set of resources, it can then 
> decide to scale the job down or simply restart. In the scope of this issue, 
> the {{ExecutionGraphDriver}} should simply call into the {{RestartStrategy}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10618) Introduce catalog for Flink tables

2018-10-23 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661688#comment-16661688
 ] 

Shimin Yang commented on FLINK-10618:
-

Sounds good. Do you have a draft of design doc?

> Introduce catalog for Flink tables
> --
>
> Key: FLINK-10618
> URL: https://issues.apache.org/jira/browse/FLINK-10618
> Project: Flink
>  Issue Type: New Feature
>  Components: SQL Client
>Affects Versions: 1.6.1
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> Besides meta objects such as tables that may come from an 
> {{ExternalCatalog}}, Flink also deals with tables/views/functions that are 
> created on the fly (in memory), or specified in a configuration file. Those 
> objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them 
> in memory, which are non-persistent, or recreates them from a file, which is 
> a big pain for the user. Those objects are only known to Flink but Flink has 
> a poor management for them.
> Since they are typical objects in a database catalog, it's natural to have a 
> catalog that manages those objects. The interface will be similar to 
> {{ExternalCatalog}}, which contains meta objects that are not managed by 
> Flink. There are several possible implementations of the Flink internal 
> catalog interface: memory, file, external registry (such as confluent schema 
> registry or Hive metastore), and relational database, etc. 
> The initial functionality as well as the catalog hierarchy could be very 
> simple. The basic functionality of the catalog will be mostly create, alter, 
> and drop tables, views, functions, etc. Obviously, this can evolve over the 
> time.
> We plan to provide implementations with memory, file, and Hive metastore, and 
> will be plugged in at SQL-Client layer.
> Please provide your feedback.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656235#comment-16656235
 ] 

Shimin Yang edited comment on FLINK-10540 at 10/19/18 3:37 AM:
---

Look like it's only used by flink-storm example in production code. Better to 
start after storm is removed [~Tison].


was (Author: dangdangdang):
Look like it's only used by flink-storm example. Better to start after storm is 
removed [~Tison].

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656235#comment-16656235
 ] 

Shimin Yang commented on FLINK-10540:
-

Look like it's only used by flink-storm example. Better to start after storm is 
removed [~Tison].

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10540:
---

Assignee: Shimin Yang

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-16 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651276#comment-16651276
 ] 

Shimin Yang commented on FLINK-10540:
-

Hi [~Tison], I would love to take this if you haven't worked on it.

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10552) Provide RichAsyncFunction for scala API

2018-10-15 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10552:

Issue Type: Improvement  (was: Bug)

> Provide RichAsyncFunction for scala API
> ---
>
> Key: FLINK-10552
> URL: https://issues.apache.org/jira/browse/FLINK-10552
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>
> Currently, only Java API provide a RichAsyncFunction abstract class while 
> scala dose not. Thought it would be nice to provide the same function for 
> scala api. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils

2018-10-15 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10511:

Issue Type: Improvement  (was: Bug)

> Code duplication of creating RPC service in ClusterEntrypoint and 
> AkkaRpcServiceUtils
> -
>
> Key: FLINK-10511
> URL: https://issues.apache.org/jira/browse/FLINK-10511
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Minor
>
> The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but 
> the ClusterEntrypoint use a protected method to do the same job. I think it's 
> better to use the same method in AkkaRpcServiceUtils for reuse of code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10533) job parallelism equals task slots number but not use the same number of the task slots as the parallelism

2018-10-15 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649879#comment-16649879
 ] 

Shimin Yang commented on FLINK-10533:
-

Flink uses Slot Sharing Group to optimize the Resource Usage and allow subtasks 
in same job to share the slot. It's based on task but not job graph. If you 
want to use four slots you can set parallelism to 4. Please refer to 
[https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#task-slots-and-resources]
 for more info. If you think this might solve your confusion, I will close this 
issue.

> job parallelism equals task slots number but not use the same number of the 
> task slots as the parallelism
> -
>
> Key: FLINK-10533
> URL: https://issues.apache.org/jira/browse/FLINK-10533
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.3, 1.6.0, 1.6.1
>Reporter: sean.miao
>Priority: Major
> Attachments: image-2018-10-12-10-35-57-443.png, 
> image-2018-10-12-10-36-13-503.png
>
>
> i use the table api and do not use the datastream api。
>  
> my job has two graph and every parallelism is two.so the total parallelism is 
> four;
> but if give my job four slots but it just use two slots.
>  
> !image-2018-10-12-10-36-13-503.png!
> !image-2018-10-12-10-35-57-443.png!
>  
> thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10552) Provide RichAsyncFunction for scala API

2018-10-15 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10552:
---

 Summary: Provide RichAsyncFunction for scala API
 Key: FLINK-10552
 URL: https://issues.apache.org/jira/browse/FLINK-10552
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Reporter: Shimin Yang
Assignee: Shimin Yang


Currently, only Java API provide a RichAsyncFunction abstract class while scala 
dose not. Thought it would be nice to provide the same function for scala api. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils

2018-10-12 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647692#comment-16647692
 ] 

Shimin Yang commented on FLINK-10511:
-

[~till.rohrmann], good point. We can add an override function createRpcService 
takes port range as parameter and use the BootstrapTools to start an actor 
system in AkkaRpcServiceUtils.

> Code duplication of creating RPC service in ClusterEntrypoint and 
> AkkaRpcServiceUtils
> -
>
> Key: FLINK-10511
> URL: https://issues.apache.org/jira/browse/FLINK-10511
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Minor
>
> The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but 
> the ClusterEntrypoint use a protected method to do the same job. I think it's 
> better to use the same method in AkkaRpcServiceUtils for reuse of code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-10-09 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10166:
---

Assignee: (was: Shimin Yang)

> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.7.0
>
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10500) Let ExecutionGraphDriver react to fail signal

2018-10-09 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643038#comment-16643038
 ] 

Shimin Yang commented on FLINK-10500:
-

Hi [~till.rohrmann], have you started working on this? I would love to take 
this after the decoupling of ExecutionGraph in JobMaster.

> Let ExecutionGraphDriver react to fail signal
> -
>
> Key: FLINK-10500
> URL: https://issues.apache.org/jira/browse/FLINK-10500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to scale down when there are not enough resources available or if 
> TMs died, the {{ExecutionGraphDriver}} needs to learn about a failure. 
> Depending on the failure type and the available set of resources, it can then 
> decide to scale the job down or simply restart. In the scope of this issue, 
> the {{ExecutionGraphDriver}} should simply call into the {{RestartStrategy}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10511) Code duplication of creating RPC service in ClusterEntrypoint and AkkaRpcServiceUtils

2018-10-08 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10511:
---

 Summary: Code duplication of creating RPC service in 
ClusterEntrypoint and AkkaRpcServiceUtils
 Key: FLINK-10511
 URL: https://issues.apache.org/jira/browse/FLINK-10511
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: Shimin Yang
Assignee: Shimin Yang


The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but 
the ClusterEntrypoint use a protected method to do the same job. I think it's 
better to use the same method in AkkaRpcServiceUtils for reuse of code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-19 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621468#comment-16621468
 ] 

Shimin Yang commented on FLINK-10245:
-

Hi [~hequn8128], I have updated the document, we can discuss how to implement 
the merge in there.

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

2018-09-19 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620386#comment-16620386
 ] 

Shimin Yang commented on FLINK-10333:
-

Hi [~till.rohrmann], is there a discussion thread for this issue?

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, 
> CompletedCheckpoints)
> -
>
> Key: FLINK-10333
> URL: https://issues.apache.org/jira/browse/FLINK-10333
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> While going over the ZooKeeper based stores 
> ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, 
> {{ZooKeeperCompletedCheckpointStore}}) and the underlying 
> {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were 
> introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} 
> or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization 
> problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of 
> exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case 
> of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be 
> better to move {{RetrievableStateStorageHelper}} out of it for a better 
> separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even 
> if it is locked. This should not happen since it could leave another system 
> in an inconsistent state (imagine a changed {{JobGraph}} which restores from 
> an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} 
> which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. 
> Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10000) There's a mistake in the comments of CoGroupedStreams.java

2018-09-19 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang closed FLINK-1.
---
Resolution: Won't Fix

> There's a mistake in the comments of CoGroupedStreams.java
> --
>
> Key: FLINK-1
> URL: https://issues.apache.org/jira/browse/FLINK-1
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Minor
>  Labels: pull-request-available
>
> The example of {{CoGroupedStream}}
> {code}
> DataStream result = one.coGroup(two)
>   .where(new MyFirstKeySelector())
>   .equalTo(new MyFirstKeySelector())
>   .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
>   .apply(new MyCoGroupFunction());
>  {code}
> the params of method {{equalTo}} should be {{MySecondKeySelector}} instead of 
> {{MyFirstKeySelector}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10337) Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore

2018-09-19 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620357#comment-16620357
 ] 

Shimin Yang commented on FLINK-10337:
-

[~till.rohrmann], sounds reasonable. I will follow the parent issue.

> Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore
> 
>
> Key: FLINK-10337
> URL: https://issues.apache.org/jira/browse/FLINK-10337
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> Use {{ZooKeeperStateStore}} in {{ZooKeeperCompletedCheckpointStore}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-19 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620356#comment-16620356
 ] 

Shimin Yang commented on FLINK-10344:
-

Hi [~SleePy], is there any relation between this issue with your attachment?

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: Tornado.pdf, Tornado.pdf
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase sink connector

2018-09-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620069#comment-16620069
 ] 

Shimin Yang commented on FLINK-10206:
-

[~hequn8128] [~twalthr] [~till.rohrmann] Hi all, I have updated the design 
document. I had some proposal on how to implement the buffer and think I still 
need some advice. Could you review it when you are free?

> Add hbase sink connector
> 
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase source connector for batch operation. 
>  
> In some cases, we need to save Streaming/Batch results into hbase.  Just like 
> cassandra streaming/Batch sink implementations. 
>  
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10206) Add hbase sink connector

2018-09-18 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10206:

Description: 
Now, there is a hbase source connector for batch operation. 

 

In some cases, we need to save Streaming/Batch results into hbase.  Just like 
cassandra streaming/Batch sink implementations. 

 
Design documentation: 
[https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]

 

  was:
Now, there is a hbase source connector for batch operation. 

 

In some cases, we need to save Streaming/Batch results into hbase.  Just like 
cassandra streaming/Batch sink implementations. 

 


> Add hbase sink connector
> 
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase source connector for batch operation. 
>  
> In some cases, we need to save Streaming/Batch results into hbase.  Just like 
> cassandra streaming/Batch sink implementations. 
>  
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618688#comment-16618688
 ] 

Shimin Yang commented on FLINK-10245:
-

[~hequn8128] Sounds good.

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618686#comment-16618686
 ] 

Shimin Yang commented on FLINK-10245:
-

Hi [~yuanoOo], I think the Increment operation should in the bussiness logic 
not the Sink. If we retrieve a value from HBase and then increment by one, then 
it will violate the consistency during recovery. For the pv senario, I suggest 
you can use the count in flink API and write to HBase in an overwrite fashion.

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-17 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10344:
---

Assignee: Shimin Yang

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10337) Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore

2018-09-14 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10337:
---

Assignee: Shimin Yang

> Use ZooKeeperStateStore in ZooKeeperCompletedCheckpointStore
> 
>
> Key: FLINK-10337
> URL: https://issues.apache.org/jira/browse/FLINK-10337
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> Use {{ZooKeeperStateStore}} in {{ZooKeeperCompletedCheckpointStore}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-09-13 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613318#comment-16613318
 ] 

Shimin Yang commented on FLINK-10166:
-

Hi [~twalthr]. Well, not many times. Maybe we could add some hint in the log 
instead of throw a exception with message "Table program cannot be compiled. 
This is a bug. Please file an issue.".

> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Shimin Yang
>Priority: Blocker
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-09-13 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613318#comment-16613318
 ] 

Shimin Yang edited comment on FLINK-10166 at 9/13/18 10:55 AM:
---

Hi [~twalthr]. Well, not many times. Maybe we could add some hint in the log 
instead of just throwing a exception with message "Table program cannot be 
compiled. This is a bug. Please file an issue.".


was (Author: dangdangdang):
Hi [~twalthr]. Well, not many times. Maybe we could add some hint in the log 
instead of throw a exception with message "Table program cannot be compiled. 
This is a bug. Please file an issue.".

> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Shimin Yang
>Priority: Blocker
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-09-13 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10166:
---

Assignee: Shimin Yang

> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Shimin Yang
>Priority: Blocker
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-09-13 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613136#comment-16613136
 ] 

Shimin Yang commented on FLINK-10166:
-

Hi [~dawidwys] [~StephanEwen], I think a more elegant way to solve this problem 
is to not relocate commons-codec. What do you think?

> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10330) Code Compilation failed in local mode

2018-09-13 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang closed FLINK-10330.
---
Resolution: Duplicate

> Code Compilation failed in local mode
> -
>
> Key: FLINK-10330
> URL: https://issues.apache.org/jira/browse/FLINK-10330
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.1, 1.5.4
>
> Attachments: error log
>
>
> Janino compiler in flink-table can not find type Base64 in commons-codec.
> [^error log]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10330) Code Compilation failed in local mode

2018-09-13 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10330:
---

 Summary: Code Compilation failed in local mode
 Key: FLINK-10330
 URL: https://issues.apache.org/jira/browse/FLINK-10330
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.6.0, 1.5.0
Reporter: Shimin Yang
Assignee: Shimin Yang
 Fix For: 1.6.1, 1.5.4
 Attachments: error log

Janino compiler in flink-table can not find type Base64 in commons-codec.

[^error log]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-12 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10327:
---

Assignee: Piotr Nowojski  (was: Shimin Yang)

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-12 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612278#comment-16612278
 ] 

Shimin Yang commented on FLINK-10327:
-

Sounds good, I ll work on it.

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Shimin Yang
>Priority: Major
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-12 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10327:
---

Assignee: Shimin Yang

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Shimin Yang
>Priority: Major
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager

2018-09-12 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612017#comment-16612017
 ] 

Shimin Yang commented on FLINK-10322:
-

Cool.

> Unused instance variable MetricRegistry in ResourceManager
> --
>
> Key: FLINK-10322
> URL: https://issues.apache.org/jira/browse/FLINK-10322
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Minor
>
> Same as the title.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager

2018-09-12 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611714#comment-16611714
 ] 

Shimin Yang commented on FLINK-10322:
-

Hi [~Tison], do you have any suggestion about what to add in the mertrics of RM?

> Unused instance variable MetricRegistry in ResourceManager
> --
>
> Key: FLINK-10322
> URL: https://issues.apache.org/jira/browse/FLINK-10322
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Minor
>
> Same as the title.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10322) Unused instance variable MetricRegistry in ResourceManager

2018-09-11 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10322:
---

 Summary: Unused instance variable MetricRegistry in ResourceManager
 Key: FLINK-10322
 URL: https://issues.apache.org/jira/browse/FLINK-10322
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Reporter: Shimin Yang
Assignee: Shimin Yang


Same as the title.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-11 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611491#comment-16611491
 ] 

Shimin Yang commented on FLINK-10245:
-

[~hequn8128], we may make the deduplication as an optimization in something 
like append mode or update mode without nullable field.

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-11 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611487#comment-16611487
 ] 

Shimin Yang commented on FLINK-10245:
-

Hi [~hequn8128],I have been thinking about should we allow Nullable field. And 
if the Nullable field is allowed, then the mutations cannot be deduplicated by 
rowkey. Meanwhile, I also the think the deduplication is not suitable for 
Retract or Upsert Table SInk that I will implement in the future. 

But keep a memory buffer and use batch api might be a good idea. I will try to 
work on it.

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-09-10 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16609013#comment-16609013
 ] 

Shimin Yang commented on FLINK-10247:
-

[~till.rohrmann] It could be a good way to decouple MetricsService from other 
components, but I am not sure if there any method to configure the priority of 
these two actor systems.

> Run MetricQueryService in separate thread pool
> --
>
> Key: FLINK-10247
> URL: https://issues.apache.org/jira/browse/FLINK-10247
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-09 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608642#comment-16608642
 ] 

Shimin Yang commented on FLINK-10245:
-

Hi [~hequn8128],

For the comments you mentioned last time, I looked into the HBase client 
implementation and think that I can add a scheduler to flush the data 
periodically by the time set by user.

I am not very sure about should I replace the api with Hbase batch api since it 
already provided buffer and flush functionality. 

And if I stick with this api, I think it's hard to deduplicate data using 
rowkey as it is buffered in the BufferedMutator in HBase client and there's no 
deletion of Mutator function provided.

What do you think?

Best

Shimin

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10245) Add DataStream HBase Sink

2018-09-08 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607989#comment-16607989
 ] 

Shimin Yang commented on FLINK-10245:
-

Hi [~hequn8128],

Thanks for your remind and I have opened the comment access. For the buffer 
part, I will take a loot at it and see what I can do.

Best,

Shimin

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10245) Add DataStream HBase Sink

2018-09-08 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607989#comment-16607989
 ] 

Shimin Yang edited comment on FLINK-10245 at 9/8/18 9:50 AM:
-

Hi [~hequn8128],

Thanks for your remind and I have opened the comment access. For the buffer 
part, I will take a look at it and see what I can do.

Best,

Shimin


was (Author: dangdangdang):
Hi [~hequn8128],

Thanks for your remind and I have opened the comment access. For the buffer 
part, I will take a loot at it and see what I can do.

Best,

Shimin

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-09-07 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607114#comment-16607114
 ] 

Shimin Yang commented on FLINK-10247:
-

Hi [~Zentol], sounds good. I am gonna working on it.

> Run MetricQueryService in separate thread pool
> --
>
> Key: FLINK-10247
> URL: https://issues.apache.org/jira/browse/FLINK-10247
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-09-07 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10247:
---

Assignee: Shimin Yang

> Run MetricQueryService in separate thread pool
> --
>
> Key: FLINK-10247
> URL: https://issues.apache.org/jira/browse/FLINK-10247
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-09-06 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reopened FLINK-9567:


This issue also occurs while using region strategy. In that case the pending 
slot should also be checked during start new worker and on container allocated 
before request a new Yarn container.

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.1, 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-09-05 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605338#comment-16605338
 ] 

Shimin Yang commented on FLINK-10247:
-

Hi [~till.rohrmann], do you mean that we should move MetricQueryService out of 
the Akka Actor System and use a dedicated thread pool to receive the messages 
and deal with them?

Cheers

> Run MetricQueryService in separate thread pool
> --
>
> Key: FLINK-10247
> URL: https://issues.apache.org/jira/browse/FLINK-10247
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> In order to make the {{MetricQueryService}} run independently of the main 
> Flink components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase sink connector

2018-09-03 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602287#comment-16602287
 ] 

Shimin Yang commented on FLINK-10206:
-

[~hequn8128] Thanks for your advice, I have attached a link to desiign document 
in  Flink-10245.

> Add hbase sink connector
> 
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase source connector for batch operation. 
>  
> In some cases, we need to save Streaming/Batch results into hbase.  Just like 
> cassandra streaming/Batch sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10245) Add DataStream HBase Sink

2018-09-02 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang updated FLINK-10245:

Description: Design documentation: 
[https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]

> Add DataStream HBase Sink
> -
>
> Key: FLINK-10245
> URL: https://issues.apache.org/jira/browse/FLINK-10245
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Design documentation: 
> [https://docs.google.com/document/d/1of0cYd73CtKGPt-UL3WVFTTBsVEre-TNRzoAt5u2PdQ/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10277) Add AppendStreamTableSink and UpsertStreamTableSink

2018-09-02 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10277:
---

 Summary: Add AppendStreamTableSink and UpsertStreamTableSink
 Key: FLINK-10277
 URL: https://issues.apache.org/jira/browse/FLINK-10277
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Shimin Yang
Assignee: Shimin Yang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase sink connector

2018-08-31 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598754#comment-16598754
 ] 

Shimin Yang commented on FLINK-10206:
-

Hi [~hequn8128] , I am planning to implement the table sink for append and 
retract sink, but I think I should finished the datastream sink first since the 
table sink relied on the datastream sink. BTW, should I just add a link to 
desin document in the PR or propose a FLIP?

> Add hbase sink connector
> 
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase source connector for batch operation. 
>  
> In some cases, we need to save Streaming/Batch results into hbase.  Just like 
> cassandra streaming/Batch sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase sink connector

2018-08-31 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598750#comment-16598750
 ] 

Shimin Yang commented on FLINK-10206:
-

Thank you for your advices. I will briefly talk about the consistency and 
performance problem here and work on the design document asap. There's a option 
in the Table Bulilder named enable buffer, this will buffer the operations and 
flush them into hbase if the buffer is full. During the snapshot, the hbase 
sink will flush all the buffer operations in case of failure. In general, it 
can provide at least once guarantee.

> Add hbase sink connector
> 
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase source connector for batch operation. 
>  
> In some cases, we need to save Streaming/Batch results into hbase.  Just like 
> cassandra streaming/Batch sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10248) Add HBase Table Sink

2018-08-29 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10248:
---

 Summary: Add HBase Table Sink
 Key: FLINK-10248
 URL: https://issues.apache.org/jira/browse/FLINK-10248
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Shimin Yang
Assignee: Shimin Yang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10245) Add DataStream HBase Sink

2018-08-29 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10245:
---

 Summary: Add DataStream HBase Sink
 Key: FLINK-10245
 URL: https://issues.apache.org/jira/browse/FLINK-10245
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Shimin Yang
Assignee: Shimin Yang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-08-29 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang closed FLINK-9567.
--

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.1, 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592501#comment-16592501
 ] 

Shimin Yang commented on FLINK-10206:
-

Our streaming job heavily relied on hbase as a sink. I am willing to do the 
streaming part of hbase sink. Can we split into two subtasks? One for batch and 
one for streaming.

> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase connector for batch operation. 
>  
> In some cases, we need to save Streaming result into hbase.  Just like 
> cassandra streaming sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10206:
---

Assignee: Shimin Yang

> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase connector for batch operation. 
>  
> In some cases, we need to save Streaming result into hbase.  Just like 
> cassandra streaming sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9054) IllegalStateException: Buffer pool is destroyed

2018-08-16 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582054#comment-16582054
 ] 

Shimin Yang commented on FLINK-9054:


Could you please attach more logs? Feels like it's not enough to locate the bug.

> IllegalStateException: Buffer pool is destroyed
> ---
>
> Key: FLINK-9054
> URL: https://issues.apache.org/jira/browse/FLINK-9054
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Configuration, Core
>Affects Versions: 1.4.2
>Reporter: dhiraj prajapati
>Priority: Blocker
> Attachments: flink-conf.yaml
>
>
> Hi,
> I have a flink cluster running on 2 machines, say A and B.
> Job manager is running on A. There are 2 TaksManagers, one on each node.
> So effectively, A has a job manager and a task manager, while B has a task 
> manager.
> When I submit a job to the cluster, I see below exception and the job fails:
> 2018-03-22 17:16:52,205 WARN 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while 
> emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:150)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:294)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:489)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 10 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:141)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitLatencyMarker(OperatorChain.java:604)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:824)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:679)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:662)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:486)
>  ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:203)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:191)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:107)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:102)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:138)
>  ... 19 more
>  
> The exception does not come when I run only one JobManager (only on machine 
> B).
>

[jira] [Created] (FLINK-10000) There's a mistake in the comments of CoGroupedStreams.java

2018-07-31 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-1:
---

 Summary: There's a mistake in the comments of CoGroupedStreams.java
 Key: FLINK-1
 URL: https://issues.apache.org/jira/browse/FLINK-1
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Shimin Yang
Assignee: Shimin Yang


The example of CoGroupedStream 

  * DataStream result = one.coGroup(two)
 * .where(new MyFirstKeySelector())
 * .equalTo(new MyFirstKeySelector())
 * .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
 * .apply(new MyCoGroupFunction());
 * } 

the params of method equalTo should be MySecondKeySelector instead of 
MyFirstKeySelector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-19 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516972#comment-16516972
 ] 

Shimin Yang commented on FLINK-9567:


Thanks for assigning the issue to me, I will do my best to fix it.

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-17 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515018#comment-16515018
 ] 

Shimin Yang commented on FLINK-9567:


I think I can fix the issue but I cannot assign to myself.

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Critical
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-14 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513390#comment-16513390
 ] 

Shimin Yang commented on FLINK-9567:


I modified the onContainerCompleted method and running for test.

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Priority: Critical
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >