[jira] [Updated] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuta Morisawa updated FLINK-8532: - Description: In some conditions, RebalancePartitioner doesn't balance data correctly because it use the same value for selecting next operators. RebalancePartitioner initializes its partition id using the same value in every threads, so it indeed balances data, but at one moment the amount of data in each operator is skew. Particularly, when the data rate of former operators is equal , data skew becomes severe. Example: Consider a simple operator chain. -> map1 -> rebalance -> map2 -> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6). map1 map2 st1 st4 st2 st5 st3 st6 At the beginning, every subtasks in map1 sends data to st4 in map2 because they use the same initial parition id. Next time the map1 receive data st1,2,3 send data to st5 because they increment its partition id when they processed former data. In my environment, it takes twice the time to process data when I use RebalancePartitioner as long as I use other partitioners(rescale, keyby). To solve this problem, in my opinion, RebalancePartitioner should use its own operator id for the initial value. was: In some conditions, RebalancePartitioner doesn't balance data correctly because it use the same value for selecting next operators. RebalancePartitioner initializes its partition id using the same value in every threads, so it indeed balances data, but at one moment the amount of data in each operator is skew. Particularly, when the data rate of former operators is equal , data skew becomes severe. Example: Consider a simple operator chain. --> map1 ---rebalance--> map2 —> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6). map1 map2 st1 st4 st2 st5 st3 st6 At the beginning, every subtasks in map1 sends data to st4 in map2 because they use the same initial parition id. Next time the map1 receive data st1,2,3 send data to st5 because they increment its partition id when they processed former data. In my environment, it takes twice the time to process data when I use RebalancePartitioner as long as I use other partitioners(rescale, keyby). To solve this problem, in my opinion, RebalancePartitioner should use its own operator id for the initial value. > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Priority: Minor > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346376#comment-16346376 ] ASF GitHub Bot commented on FLINK-8357: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5371 I watched and the Travis error does not relevant to this issue. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
Yuta Morisawa created FLINK-8532: Summary: RebalancePartitioner should use Random value for its first partition Key: FLINK-8532 URL: https://issues.apache.org/jira/browse/FLINK-8532 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Yuta Morisawa In some conditions, RebalancePartitioner doesn't balance data correctly because it use the same value for selecting next operators. RebalancePartitioner initializes its partition id using the same value in every threads, so it indeed balances data, but at one moment the amount of data in each operator is skew. Particularly, when the data rate of former operators is equal , data skew becomes severe. Example: Consider a simple operator chain. ---> map1 ---rebalance---> map2 —> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6). map1 map2 st1 st4 st2 st5 st3 st6 At the beginning, every subtasks in map1 sends data to st4 in map2 because they use the same initial parition id. Next time the map1 receive data st1,2,3 send data to st5 because they increment its partition id when they processed former data. In my environment, it takes twice the time to process data when I use RebalancePartitioner as long as I use other partitioners(rescale, keyby). To solve this problem, in my opinion, RebalancePartitioner should use its own operator id for the initial value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuta Morisawa updated FLINK-8532: - Description: In some conditions, RebalancePartitioner doesn't balance data correctly because it use the same value for selecting next operators. RebalancePartitioner initializes its partition id using the same value in every threads, so it indeed balances data, but at one moment the amount of data in each operator is skew. Particularly, when the data rate of former operators is equal , data skew becomes severe. Example: Consider a simple operator chain. --> map1 ---rebalance--> map2 —> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6). map1 map2 st1 st4 st2 st5 st3 st6 At the beginning, every subtasks in map1 sends data to st4 in map2 because they use the same initial parition id. Next time the map1 receive data st1,2,3 send data to st5 because they increment its partition id when they processed former data. In my environment, it takes twice the time to process data when I use RebalancePartitioner as long as I use other partitioners(rescale, keyby). To solve this problem, in my opinion, RebalancePartitioner should use its own operator id for the initial value. was: In some conditions, RebalancePartitioner doesn't balance data correctly because it use the same value for selecting next operators. RebalancePartitioner initializes its partition id using the same value in every threads, so it indeed balances data, but at one moment the amount of data in each operator is skew. Particularly, when the data rate of former operators is equal , data skew becomes severe. Example: Consider a simple operator chain. ---> map1 ---rebalance---> map2 —> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6). map1 map2 st1 st4 st2 st5 st3 st6 At the beginning, every subtasks in map1 sends data to st4 in map2 because they use the same initial parition id. Next time the map1 receive data st1,2,3 send data to st5 because they increment its partition id when they processed former data. In my environment, it takes twice the time to process data when I use RebalancePartitioner as long as I use other partitioners(rescale, keyby). To solve this problem, in my opinion, RebalancePartitioner should use its own operator id for the initial value. > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Priority: Minor > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > --> map1 ---rebalance--> map2 —> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5371 I watched and the Travis error does not relevant to this issue. ---
[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346366#comment-16346366 ] ASF GitHub Bot commented on FLINK-7095: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5375 @tillrohrmann It seems that now there is not any ```Option``` be registered for later parse the ```configDir ``` parameter I just checked. I either register the ```configDir``` parameter in somewhere, such as in ```CliFrontendParser``` class or directly use ```TaskManager.parseArgsAndLoadConfig(args);``` to get ```Configuration```. What do you think of this ? @tillrohrmann Thanks. > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6 > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5375 @tillrohrmann It seems that now there is not any ```Option``` be registered for later parse the ```configDir ``` parameter I just checked. I either register the ```configDir``` parameter in somewhere, such as in ```CliFrontendParser``` class or directly use ```TaskManager.parseArgsAndLoadConfig(args);``` to get ```Configuration```. What do you think of this ? @tillrohrmann Thanks. ---
[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden
[ https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346348#comment-16346348 ] ASF GitHub Bot commented on FLINK-8407: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5369 Ah ha, it doesn't matter. 😄 The test has been updated. Actually, I wanted to ensure that all the partitioning methods should cause the exception. However, that would be fussy and thus I only kept the broadcasted one. > Setting the parallelism after a partitioning operation should be forbidden > -- > > Key: FLINK-8407 > URL: https://issues.apache.org/jira/browse/FLINK-8407 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Major > > Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} > create new {{DataStreams}}, which allow the users to set parallelisms for > them. However, the {{PartitionTransformations}} in these returned > {{DataStreams}} will only add virtual nodes, whose parallelisms could not be > specified, in the execution graph. We should forbid users to set the > parallelism after a partitioning operation since they won't actually work. > Also the corresponding documents should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5369 Ah ha, it doesn't matter. ð The test has been updated. Actually, I wanted to ensure that all the partitioning methods should cause the exception. However, that would be fussy and thus I only kept the broadcasted one. ---
[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support
[ https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346320#comment-16346320 ] ASF GitHub Bot commented on FLINK-8101: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/5374#discussion_r164961589 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.Client; --- End diff -- Should not be deleted, be used for {@link Client}. > Elasticsearch 6.x support > - > > Key: FLINK-8101 > URL: https://issues.apache.org/jira/browse/FLINK-8101 > Project: Flink > Issue Type: New Feature > Components: ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Flavio Pompermaier >Priority: Major > Fix For: 1.5.0 > > > Recently, elasticsearch 6.0.0 was released: > https://www.elastic.co/blog/elasticsearch-6-0-0-released > The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/5374#discussion_r164961589 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.Client; --- End diff -- Should not be deleted, be used for {@link Client}. ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346257#comment-16346257 ] ASF GitHub Bot commented on FLINK-8516: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r164952695 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -93,6 +93,12 @@ /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; + /** +* The function that determines which subtask a shard should be assigned to. +*/ + // TODO: instead of the property, use a factory method that would allow subclass to access source context? --- End diff -- createFn(...) that will allow the function to be created with access to runtime context (like the number of subtasks), and then change the fn signature to only take shard metadata as parameter. Subclasses can override createFn, instead of having the property. > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r164952695 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -93,6 +93,12 @@ /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; + /** +* The function that determines which subtask a shard should be assigned to. +*/ + // TODO: instead of the property, use a factory method that would allow subclass to access source context? --- End diff -- createFn(...) that will allow the function to be created with access to runtime context (like the number of subtasks), and then change the fn signature to only take shard metadata as parameter. Subclasses can override createFn, instead of having the property. ---
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346201#comment-16346201 ] ASF GitHub Bot commented on FLINK-8357: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5371 I have changed the code, @StephanEwen @XuMingmin Welcome to review those codes ~ Thanks. I make the ```MaxFileSize``` to 200MB and storage for 30 days now. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5371 I have changed the code, @StephanEwen @XuMingmin Welcome to review those codes ~ Thanks. I make the ```MaxFileSize``` to 200MB and storage for 30 days now. ---
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180 ] yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:09 AM: hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" that is Kafka timestamp is not used! So I think that the method deserialize of KeyedDeserializationSchema should add a parameter 'kafka message timestamp' (from ConsumerRecord) .And in some business scenarios, this is useful! Thanks! was (Author: backlight): hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" that is Kafka timestamp is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180 ] yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:01 AM: hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" that is Kafka timestamp is not used! Thanks! was (Author: backlight): hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346180#comment-16346180 ] yanxiaobin commented on FLINK-8500: --- hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Description: The next program attached will stuck at some special parallelism in some situation. When parallelism is 80 in previous setting, The program will always stuck. And when parallelism is 100, everything goes well. According to my research I found when the parallelism equals to number of taskslots. The program is not fastest and probably caused network buffer not enough. How networker buffer related to parallelism and how parallelism relate to running task (In other words we have 160 taskslots but running task can be far more than taskslots). Parallelism cannot be equals to half of the cpu. Or will casuse "java.io.FileNotFoundException". You can repeat exception on your pc and set your parallelism equals to half of your cpu core. was: The next program attached will stuck at some special parallelism in some situation. When parallelism is 80 in previous setting, The program will always stuck. And when parallelism is 100, everything goes well. According to my research I found when the parallelism equals to number of taskslots. The program is not fastest and probably caused network buffer not enough. How networker buffer related to parallelism and how parallelism relate to running task (In other words we have 160 taskslots but running task can be far more than taskslots). Parallelism cannot be equals to half of the cpu. Or will casuse "java.io.FileNotFoundException" > When use parallelism equals to half of the number of cpu, join and shuffle > operators will easly cause deadlock. > --- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). > Parallelism cannot be equals to half of the cpu. > Or will casuse "java.io.FileNotFoundException". You can repeat exception on > your pc and set your parallelism equals to half of your cpu core. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Description: The next program attached will stuck at some special parallelism in some situation. When parallelism is 80 in previous setting, The program will always stuck. And when parallelism is 100, everything goes well. According to my research I found when the parallelism equals to number of taskslots. The program is not fastest and probably caused network buffer not enough. How networker buffer related to parallelism and how parallelism relate to running task (In other words we have 160 taskslots but running task can be far more than taskslots). Parallelism cannot be equals to half of the cpu. Or will casuse "java.io.FileNotFoundException" was:The next program attached will stuck at some special parallelism in some situation. When parallelism is 80 in previous setting, The program will always stuck. And when parallelism is 100, everything goes well. According to my research I found when the parallelism equals to number of taskslots. The program is not fastest and probably caused network buffer not enough. How networker buffer related to parallelism and how parallelism relate to running task (In other words we have 160 taskslots but running task can be far more than taskslots). > When use parallelism equals to half of the number of cpu, join and shuffle > operators will easly cause deadlock. > --- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). > Parallelism cannot be equals to half of the cpu. > Or will casuse "java.io.FileNotFoundException" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operators will easly cause deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Description: The next program attached will stuck at some special parallelism in some situation. When parallelism is 80 in previous setting, The program will always stuck. And when parallelism is 100, everything goes well. According to my research I found when the parallelism equals to number of taskslots. The program is not fastest and probably caused network buffer not enough. How networker buffer related to parallelism and how parallelism relate to running task (In other words we have 160 taskslots but running task can be far more than taskslots). (was: The next program attached will stuck at some special parallelism in some situation. When parallelism is 80 in previous setting, The program will always stuck. And when parallelism is 100, everything goes well. According to my research I found when the parallelism equals to number of taskslots. The program is not fastest and probably caused network buffer not enough. How networker buffer related to parallelism and how parallelism relate to running task (In other words we have 160 taskslots but running task can be far more than taskslots).) > When use parallelism equals to half of the taskslot, join and shuffle > operators will easly cause deadlock. > -- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8500: -- Attachment: image-2018-01-31-10-48-59-633.png > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Summary: When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock. (was: When use parallelism equals to half of the taskslot, join and shuffle operators will easly cause deadlock.) > When use parallelism equals to half of the number of cpu, join and shuffle > operators will easly cause deadlock. > --- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operators will easly cause deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Summary: When use parallelism equals to half of the taskslot, join and shuffle operators will easly cause deadlock. (was: When use parallelism equals to half of the taskslot, join and shuffle operators will easly caused deadlock.) > When use parallelism equals to half of the taskslot, join and shuffle > operators will easly cause deadlock. > -- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operators will easly caused deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Summary: When use parallelism equals to half of the taskslot, join and shuffle operators will easly caused deadlock. (was: When use parallelism equals to half of the taskslot, join and shuffle operator will easly caused deadlock.) > When use parallelism equals to half of the taskslot, join and shuffle > operators will easly caused deadlock. > --- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operator will easly caused deadlock.
[ https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu.qing updated FLINK-8526: Summary: When use parallelism equals to half of the taskslot, join and shuffle operator will easly caused deadlock. (was: When use some parallelism, the program will stuck in some setting. ) > When use parallelism equals to half of the taskslot, join and shuffle > operator will easly caused deadlock. > -- > > Key: FLINK-8526 > URL: https://issues.apache.org/jira/browse/FLINK-8526 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Java API, Local Runtime >Affects Versions: 1.4.0 > Environment: 8 machines(96GB and 24 cores) and 20 taskslot per > taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my > code in standalone mode. >Reporter: zhu.qing >Priority: Major > Attachments: T2AdjActiveV.java, T2AdjMessage.java > > Original Estimate: 72h > Remaining Estimate: 72h > > The next program attached will stuck at some special parallelism in some > situation. When parallelism is 80 in previous setting, The program will > always stuck. And when parallelism is 100, everything goes well. According > to my research I found when the parallelism equals to number of taskslots. > The program is not fastest and probably caused network buffer not enough. How > networker buffer related to parallelism and how parallelism relate to > running task (In other words we have 160 taskslots but running task can be > far more than taskslots). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346161#comment-16346161 ] ASF GitHub Bot commented on FLINK-8357: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164940108 --- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties --- @@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- Will add soon. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346159#comment-16346159 ] ASF GitHub Bot commented on FLINK-8357: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164940066 --- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties --- @@ -19,10 +19,11 @@ log4j.rootLogger=INFO, file # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- Will add soon. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346162#comment-16346162 ] ASF GitHub Bot commented on FLINK-8357: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164940131 --- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml --- @@ -17,7 +17,7 @@ --> - + --- End diff -- Will add soon. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164940131 --- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml --- @@ -17,7 +17,7 @@ --> - + --- End diff -- Will add soon. ---
[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164940108 --- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties --- @@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- Will add soon. ---
[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164940066 --- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties --- @@ -19,10 +19,11 @@ log4j.rootLogger=INFO, file # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- Will add soon. ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346124#comment-16346124 ] ASF GitHub Bot commented on FLINK-8516: --- Github user tweise closed the pull request at: https://github.com/apache/flink/pull/5393 > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise closed the pull request at: https://github.com/apache/flink/pull/5393 ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346121#comment-16346121 ] ASF GitHub Bot commented on FLINK-8516: --- GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes,
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you can review an
[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
[ https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7775: -- Description: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method of PermanentBlobCache is not used. We should remove it. was: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method of PermanentBlobCache is not used. We should remove it. > Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs > --- > > Key: FLINK-7775 > URL: https://issues.apache.org/jira/browse/FLINK-7775 > Project: Flink > Issue Type: Task > Components: Local Runtime >Reporter: Ted Yu >Priority: Minor > > {code} > public int getNumberOfCachedJobs() { > return jobRefCounters.size(); > } > {code} > The method of PermanentBlobCache is not used. > We should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7917) The return of taskInformationOrBlobKey should be placed inside synchronized in ExecutionJobVertex
[ https://issues.apache.org/jira/browse/FLINK-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7917: -- Component/s: Local Runtime > The return of taskInformationOrBlobKey should be placed inside synchronized > in ExecutionJobVertex > - > > Key: FLINK-7917 > URL: https://issues.apache.org/jira/browse/FLINK-7917 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Ted Yu >Priority: Minor > > Currently in ExecutionJobVertex#getTaskInformationOrBlobKey: > {code} > } > return taskInformationOrBlobKey; > {code} > The return should be placed inside synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345958#comment-16345958 ] Bowen Li commented on FLINK-3089: - I wrote [this brief design doc|https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKcuureyEr_nPAvSo/edit?usp=sharing]. Can you guys please take a look? [~aljoscha] [~srichter] [~sihuazhou] [~xfournet] What's the next step? Shall we draft a FLIP? > State API Should Support Data Expiration (State TTL) > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes >Assignee: Bowen Li >Priority: Major > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific state which I > can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345955#comment-16345955 ] Ted Yu commented on FLINK-7795: --- error-prone has JDK 8 dependency. > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Priority: Major > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. > Here are the dependencies: > {code} > > com.google.errorprone > error_prone_annotation > ${error-prone.version} > provided > > > > com.google.auto.service > auto-service > 1.0-rc3 > true > > > com.google.errorprone > error_prone_check_api > ${error-prone.version} > provided > > > com.google.code.findbugs > jsr305 > > > > > com.google.errorprone > javac > 9-dev-r4023-3 > provided > > > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort
[ https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8037: -- Labels: kafka-connect (was: ) > Missing cast in integer arithmetic in > TransactionalIdsGenerator#generateIdsToAbort > -- > > Key: FLINK-8037 > URL: https://issues.apache.org/jira/browse/FLINK-8037 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Greg Hogan >Priority: Minor > Labels: kafka-connect > > {code} > public Set generateIdsToAbort() { > Set idsToAbort = new HashSet<>(); > for (int i = 0; i < safeScaleDownFactor; i++) { > idsToAbort.addAll(generateIdsToUse(i * poolSize * > totalNumberOfSubtasks)); > {code} > The operands are integers where generateIdsToUse() expects long parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8335) Upgrade hbase connector dependency to 1.4.0
[ https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345946#comment-16345946 ] Ted Yu commented on FLINK-8335: --- 1.4.1 RC is being voted. Once it passes, we can upgrade to 1.4.1 > Upgrade hbase connector dependency to 1.4.0 > --- > > Key: FLINK-8335 > URL: https://issues.apache.org/jira/browse/FLINK-8335 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Reporter: Ted Yu >Priority: Minor > > hbase 1.4.0 has been released. > 1.4.0 shows speed improvement over previous 1.x releases. > http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available > This issue is to upgrade the dependency to 1.4.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345905#comment-16345905 ] ASF GitHub Bot commented on FLINK-8357: --- Github user XuMingmin commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164895415 --- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties --- @@ -19,10 +19,11 @@ log4j.rootLogger=INFO, file # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- add `log4j.appender.file.MaxFileSize` and `log4j.appender.file.MaxBackupIndex` to limit the total size of log files. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345907#comment-16345907 ] ASF GitHub Bot commented on FLINK-8357: --- Github user XuMingmin commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164895963 --- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml --- @@ -17,7 +17,7 @@ --> - + --- End diff -- add a `rollingPolicy` to limit the size of log files. > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8357) enable rolling in default log settings
[ https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345906#comment-16345906 ] ASF GitHub Bot commented on FLINK-8357: --- Github user XuMingmin commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164895503 --- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties --- @@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- save as above > enable rolling in default log settings > -- > > Key: FLINK-8357 > URL: https://issues.apache.org/jira/browse/FLINK-8357 > Project: Flink > Issue Type: Improvement > Components: Logging >Reporter: Xu Mingmin >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The release packages uses {{org.apache.log4j.FileAppender}} for log4j and > {{ch.qos.logback.core.FileAppender}} for logback, which could results in very > large log files. > For most cases, if not all, we need to enable rotation in a production > cluster, and I suppose it's a good idea to make rotation as default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...
Github user XuMingmin commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164895415 --- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties --- @@ -19,10 +19,11 @@ log4j.rootLogger=INFO, file # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- add `log4j.appender.file.MaxFileSize` and `log4j.appender.file.MaxBackupIndex` to limit the total size of log files. ---
[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...
Github user XuMingmin commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164895503 --- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties --- @@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.DatePattern=.-MM-dd --- End diff -- save as above ---
[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...
Github user XuMingmin commented on a diff in the pull request: https://github.com/apache/flink/pull/5371#discussion_r164895963 --- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml --- @@ -17,7 +17,7 @@ --> - + --- End diff -- add a `rollingPolicy` to limit the size of log files. ---
[jira] [Commented] (FLINK-8504) TaskExecutor does not properly deregisters JobManager from JobLeaderService
[ https://issues.apache.org/jira/browse/FLINK-8504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345880#comment-16345880 ] ASF GitHub Bot commented on FLINK-8504: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5361 > TaskExecutor does not properly deregisters JobManager from JobLeaderService > --- > > Key: FLINK-8504 > URL: https://issues.apache.org/jira/browse/FLINK-8504 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: flip-6 > Fix For: 1.5.0 > > > The {{TaskExecutor}} should deregister jobs from the {{JobLeaderService}} > once it no longer holds slots for this job. The problem is that before > unregistering the job from the {{JobLeaderService}} in > {{TaskExecutor#freeInternal}}, the actual slot is freed which also removes > the {{JobID}} from the slot. Therefore, we lose the information to which job > the slot belonged. An easy solution would be to return a {{SlotInformation}} > object instead of the {{TaskSlot}} from {{TaskSlotTable#freeSlot}} which > contains the respective information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5361: [FLINK-8504] [flip6] Deregister jobs from the JobL...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5361 ---
[jira] [Closed] (FLINK-8504) TaskExecutor does not properly deregisters JobManager from JobLeaderService
[ https://issues.apache.org/jira/browse/FLINK-8504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8504. Resolution: Fixed Fixed via e94a488dd78e7c2efdf55a67cea886ee15a641a6 > TaskExecutor does not properly deregisters JobManager from JobLeaderService > --- > > Key: FLINK-8504 > URL: https://issues.apache.org/jira/browse/FLINK-8504 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: flip-6 > Fix For: 1.5.0 > > > The {{TaskExecutor}} should deregister jobs from the {{JobLeaderService}} > once it no longer holds slots for this job. The problem is that before > unregistering the job from the {{JobLeaderService}} in > {{TaskExecutor#freeInternal}}, the actual slot is freed which also removes > the {{JobID}} from the slot. Therefore, we lose the information to which job > the slot belonged. An easy solution would be to return a {{SlotInformation}} > object instead of the {{TaskSlot}} from {{TaskSlotTable#freeSlot}} which > contains the respective information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8531) Support separation of "Exclusive", "Shared" and "Task owned" state
Stephan Ewen created FLINK-8531: --- Summary: Support separation of "Exclusive", "Shared" and "Task owned" state Key: FLINK-8531 URL: https://issues.apache.org/jira/browse/FLINK-8531 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.5.0 Currently, all state created at a certain checkpoint goes into the directory {{chk-id}}. With incremental checkpointing, some state is shared across checkpoint and is referenced by newer checkpoints. That way, old {{chk-id}} directories stay around, containing some shared chunks. That makes it both for users and cleanup hooks hard to determine when a {{chk-x}} directory could be deleted. The same holds for state that can only every be dropped by certain operators on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples of that state are write ahead logs, which need to be retained until the move to the target system is complete, which may in some cases be later then when the checkpoint that created them is disposed. I propose to introduce different scopes for tasks: - **EXCLUSIVE** is for state that belongs to one checkpoint only - **SHARED** is for state that is possibly part of multiple checkpoints - **TASKOWNED** is for state that must never by dropped by the JobManager. For file based checkpoint targets, I propose that we have the following directory layout: {code} /user-defined-checkpoint-dir | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/ ... {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5823. - Resolution: Fixed Fixed via edc6f1000704a492629d7bdf8cbfa5ba5c45bb1f > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5823. --- > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8530) Enable detached job submission for RestClusterClient
Till Rohrmann created FLINK-8530: Summary: Enable detached job submission for RestClusterClient Key: FLINK-8530 URL: https://issues.apache.org/jira/browse/FLINK-8530 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 The {{RestClusterClient}} should also be able to submit jobs in detached mode. In detached mode, we don't wait for the {{JobExecutionResult}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8522) DefaultOperatorStateBackend writes data in checkpoint that is never read.
[ https://issues.apache.org/jira/browse/FLINK-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345395#comment-16345395 ] Kostas Kloudas commented on FLINK-8522: --- There is a commit for this here: [https://github.com/apache/flink/pull/5230] It is the first commit of that PR. > DefaultOperatorStateBackend writes data in checkpoint that is never read. > - > > Key: FLINK-8522 > URL: https://issues.apache.org/jira/browse/FLINK-8522 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.4.1 > > > In the DefaultOperatorStateBackend at line 283 we write in the checkpoint an > int declaring the number of the operator states that we include in the > checkpoint. > This number is never read when restoring and this can lead to confusion and > problems with backwards compatibility and/or extension of the types of state > we support (e.g. broadcast state). > There are two easy solutions, either remove the line and do not write the > size, or make sure that we also read this number when restoring and simply > ignore it. > I would go for the first one, i.e. remove the line. What do you think > [~richtesn] and [~tzulitai] ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345394#comment-16345394 ] ASF GitHub Bot commented on FLINK-8345: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5230 @aljoscha and @tzulitai ready for another review. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5230: [FLINK-8345] Add iterator of keyed state on broadcast sid...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5230 @aljoscha and @tzulitai ready for another review. ---
[jira] [Updated] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8411: Fix Version/s: (was: 1.4.1) > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable> map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-8411: - Reopen to change fixVersion because I'm reverting on 1.4.x because it is a semantical change that might break some user programs. > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable> map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden
[ https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345369#comment-16345369 ] ASF GitHub Bot commented on FLINK-8407: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5369 Ah dammit, you are right that you we can't actually test that the method works correctly because there is no such method. That was stupid, sorry for that! 😅 > Setting the parallelism after a partitioning operation should be forbidden > -- > > Key: FLINK-8407 > URL: https://issues.apache.org/jira/browse/FLINK-8407 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Major > > Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} > create new {{DataStreams}}, which allow the users to set parallelisms for > them. However, the {{PartitionTransformations}} in these returned > {{DataStreams}} will only add virtual nodes, whose parallelisms could not be > specified, in the execution graph. We should forbid users to set the > parallelism after a partitioning operation since they won't actually work. > Also the corresponding documents should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5369 Ah dammit, you are right that you we can't actually test that the method works correctly because there is no such method. That was stupid, sorry for that! ð ---
[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators
[ https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345340#comment-16345340 ] ASF GitHub Bot commented on FLINK-4812: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5155 I like this a lot now! 👍 One last comment/idea I had is this: I don't like that `StreamTask` has `getInputWatermarkGauge()` for the only reason that we need it in the `OperatorChain` to set it on the head operator. Could this not be set at the end of `OneInputStreamTask.init()` the same way it is set for `TwoInputStreamTask.init()` (where we then also would have to set the min-input-watermark)? > Report Watermark metrics in all operators > - > > Key: FLINK-4812 > URL: https://issues.apache.org/jira/browse/FLINK-4812 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > As reported by a user, Flink does currently not export the current low > watermark for sources > (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html). > This JIRA is for adding such a metric for the sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5155: [FLINK-4812][metrics] Expose currentLowWatermark for all ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5155 I like this a lot now! ð One last comment/idea I had is this: I don't like that `StreamTask` has `getInputWatermarkGauge()` for the only reason that we need it in the `OperatorChain` to set it on the head operator. Could this not be set at the end of `OneInputStreamTask.init()` the same way it is set for `TwoInputStreamTask.init()` (where we then also would have to set the min-input-watermark)? ---
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345247#comment-16345247 ] ASF GitHub Bot commented on FLINK-7608: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5161#discussion_r164780848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -194,14 +190,20 @@ public void setup(StreamTask containingTask, StreamConfig config, Output LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5161: [FLINK-7608][metric] Refactor latency statistics m...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5161#discussion_r164780848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -194,14 +190,20 @@ public void setup(StreamTask containingTask, StreamConfig config, Output
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345237#comment-16345237 ] ASF GitHub Bot commented on FLINK-7608: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5161 @yew1eb by `operatorName`, do you mean the `name()` or `uid()`, I think both of these could make sense. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Major > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5161: [FLINK-7608][metric] Refactor latency statistics metric
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5161 @yew1eb by `operatorName`, do you mean the `name()` or `uid()`, I think both of these could make sense. ---
[jira] [Commented] (FLINK-8525) Improve queryable state code examples
[ https://issues.apache.org/jira/browse/FLINK-8525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345234#comment-16345234 ] Kostas Kloudas commented on FLINK-8525: --- Yes, this is something that is not included in the documentation. > Improve queryable state code examples > - > > Key: FLINK-8525 > URL: https://issues.apache.org/jira/browse/FLINK-8525 > Project: Flink > Issue Type: Improvement > Components: Documentation, Queryable State >Affects Versions: 1.4.0 >Reporter: jia liu >Priority: Minor > > I't really very hard to understand the code in documents. > {code:java} > QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); > // the state descriptor of the state to be fetched. > ValueStateDescriptor> descriptor = > new ValueStateDescriptor<>( > "average", > TypeInformation.of(new TypeHint>() {}), > Tuple2.of(0L, 0L)); > CompletableFuture>> resultFuture = > client.getKvState(jobId, "query-name", key, > BasicTypeInfo.LONG_TYPE_INFO, descriptor); > // now handle the returned value > resultFuture.thenAccept(response -> { > try { > Tuple2 res = response.get(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > {code} > I can't get the declaration of key and jobId from it. And the most important, > there is no test case in flink-queryable-state module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8525) Improve queryable state code examples
[ https://issues.apache.org/jira/browse/FLINK-8525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345221#comment-16345221 ] Chesnay Schepler commented on FLINK-8525: - One thing that should be shown is how to actually create a {{JobID}} from the ID shown in the UI, i.e. "{{JobID jobId = JobID.fromHexString("ABCDE");}}" > Improve queryable state code examples > - > > Key: FLINK-8525 > URL: https://issues.apache.org/jira/browse/FLINK-8525 > Project: Flink > Issue Type: Improvement > Components: Documentation, Queryable State >Affects Versions: 1.4.0 >Reporter: jia liu >Priority: Minor > > I't really very hard to understand the code in documents. > {code:java} > QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); > // the state descriptor of the state to be fetched. > ValueStateDescriptor> descriptor = > new ValueStateDescriptor<>( > "average", > TypeInformation.of(new TypeHint>() {}), > Tuple2.of(0L, 0L)); > CompletableFuture>> resultFuture = > client.getKvState(jobId, "query-name", key, > BasicTypeInfo.LONG_TYPE_INFO, descriptor); > // now handle the returned value > resultFuture.thenAccept(response -> { > try { > Tuple2 res = response.get(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > {code} > I can't get the declaration of key and jobId from it. And the most important, > there is no test case in flink-queryable-state module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345220#comment-16345220 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164776439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** +* Creates a new TimeBoundedStreamJoinOperator. +* +* @param lowerBound The lower bound
[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164776439 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts â [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** +* Creates a new TimeBoundedStreamJoinOperator. +* +* @param lowerBound The lower bound for evaluating if elements should be joined +* @param upperBound The upper bound for evaluating if elements should be joined +* @param lowerBoundInclusive Whether or not to include elements where the timestamp matches +
[jira] [Closed] (FLINK-4765) Migrate ConfigConstants to ConfigOptions
[ https://issues.apache.org/jira/browse/FLINK-4765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4765. --- Resolution: Fixed Fix Version/s: 1.5.0 The majority of options have been ported, a few outliers are left and will be ported as part of FLINK-8475. > Migrate ConfigConstants to ConfigOptions > > > Key: FLINK-4765 > URL: https://issues.apache.org/jira/browse/FLINK-4765 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > This issue has multiple subtasks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4767) Migrate JobManager configuration options
[ https://issues.apache.org/jira/browse/FLINK-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4767: --- Assignee: (was: Chesnay Schepler) > Migrate JobManager configuration options > > > Key: FLINK-4767 > URL: https://issues.apache.org/jira/browse/FLINK-4767 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Local Runtime >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-4767) Migrate JobManager configuration options
[ https://issues.apache.org/jira/browse/FLINK-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4767. --- Resolution: Fixed Fix Version/s: 1.5.0 Was fixed in the mean-time. > Migrate JobManager configuration options > > > Key: FLINK-4767 > URL: https://issues.apache.org/jira/browse/FLINK-4767 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Local Runtime >Reporter: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4767) Migrate JobManager configuration options
[ https://issues.apache.org/jira/browse/FLINK-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-4767: --- Assignee: Chesnay Schepler > Migrate JobManager configuration options > > > Key: FLINK-4767 > URL: https://issues.apache.org/jira/browse/FLINK-4767 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Local Runtime >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345206#comment-16345206 ] ASF GitHub Bot commented on FLINK-8479: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164773927 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** +* Creates a new TimeBoundedStreamJoinOperator. +* +* @param lowerBound The lower bou
[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164773927 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts â [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** +* Creates a new TimeBoundedStreamJoinOperator. +* +* @param lowerBound The lower bound for evaluating if elements should be joined +* @param upperBound The upper bound for evaluating if elements should be joined +* @param lowerBoundInclusive Whether or not to include elements where the timestamp matches +
[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables
[ https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345198#comment-16345198 ] ASF GitHub Bot commented on FLINK-8475: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5392 [FLINK-8475][config][docs] Integrate JM options ## What is the purpose of the change This PR integrates the JobManager `ConfigOptions` into the configuration docs generator. ## Brief change log * Add missing descriptions to config options (derived from existing description/javadocs) * integrate jobmanager configuration table into `config.md` and separate job- and taskmanager sections You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8475_jm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5392.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5392 commit 61bb0eff965cd066edebc5e4a167dd9bd3a07f77 Author: zentol Date: 2018-01-22T16:32:38Z [FLINK-8475][config][docs] Integrate JM options > Move remaining sections to generated tables > --- > > Key: FLINK-8475 > URL: https://issues.apache.org/jira/browse/FLINK-8475 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5392: [FLINK-8475][config][docs] Integrate JM options
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5392 [FLINK-8475][config][docs] Integrate JM options ## What is the purpose of the change This PR integrates the JobManager `ConfigOptions` into the configuration docs generator. ## Brief change log * Add missing descriptions to config options (derived from existing description/javadocs) * integrate jobmanager configuration table into `config.md` and separate job- and taskmanager sections You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8475_jm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5392.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5392 commit 61bb0eff965cd066edebc5e4a167dd9bd3a07f77 Author: zentol Date: 2018-01-22T16:32:38Z [FLINK-8475][config][docs] Integrate JM options ---
[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables
[ https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345185#comment-16345185 ] ASF GitHub Bot commented on FLINK-8475: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5391 [FLINK-8475][config][docs] Integrate BlobServer options ## What is the purpose of the change This PR adds the BlobServer `ConfigOptions` to the full configuration reference. ## Brief change log * Add missing descriptions to config options (derived from existing description/javadocs) * integrate BlobServer configuration table into `config.md` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8475_blob Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5391.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5391 commit 4646481d5c398ee27c2c1600eb23e10009074e20 Author: zentol Date: 2018-01-23T13:44:00Z [FLINK-8475][config][docs] Integrate BlobServer options > Move remaining sections to generated tables > --- > > Key: FLINK-8475 > URL: https://issues.apache.org/jira/browse/FLINK-8475 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5391: [FLINK-8475][config][docs] Integrate BlobServer op...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5391 [FLINK-8475][config][docs] Integrate BlobServer options ## What is the purpose of the change This PR adds the BlobServer `ConfigOptions` to the full configuration reference. ## Brief change log * Add missing descriptions to config options (derived from existing description/javadocs) * integrate BlobServer configuration table into `config.md` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8475_blob Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5391.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5391 commit 4646481d5c398ee27c2c1600eb23e10009074e20 Author: zentol Date: 2018-01-23T13:44:00Z [FLINK-8475][config][docs] Integrate BlobServer options ---
[jira] [Commented] (FLINK-8476) ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused
[ https://issues.apache.org/jira/browse/FLINK-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345170#comment-16345170 ] ASF GitHub Bot commented on FLINK-8476: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5338 > ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused > -- > > Key: FLINK-8476 > URL: https://issues.apache.org/jira/browse/FLINK-8476 > Project: Flink > Issue Type: Improvement > Components: Configuration >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > {{ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT}} is unused and should probably > be deprecated. > [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6623) BlobCacheSuccessTest fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6623. --- Resolution: Fixed Fix Version/s: 1.5.0 master: e135c3a5e2a42911f4c7f744003f5804c25d2dd9 > BlobCacheSuccessTest fails on Windows > - > > Key: FLINK-6623 > URL: https://issues.apache.org/jira/browse/FLINK-6623 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0, 1.5.0 > Environment: windows 10, java 1.8 >Reporter: constantine stanley >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > All tests in {{BlobCacheSuccessTest}} fail on Windows. > {code} > java.nio.file.FileSystemException: > C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp- > -> > C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) > at > sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) > at java.nio.file.Files.move(Files.java:1395) > at > org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464) > at > org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708) > at > org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608) > at > org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568) > at > org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778) > at > org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173) > at > org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator
[ https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345166#comment-16345166 ] ASF GitHub Bot commented on FLINK-8479: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164766198 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** +* Creates a new TimeBoundedStreamJoinOperator. +* +* @param lowerBound The lower bound
[GitHub] flink pull request #5351: [FLINK-6623][Blob] BlobServer#putBuffer moves file...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5351 ---
[jira] [Commented] (FLINK-8494) Migrate CC#DEFAULT_PARALLELISM_KEY
[ https://issues.apache.org/jira/browse/FLINK-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345172#comment-16345172 ] ASF GitHub Bot commented on FLINK-8494: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5377 > Migrate CC#DEFAULT_PARALLELISM_KEY > -- > > Key: FLINK-8494 > URL: https://issues.apache.org/jira/browse/FLINK-8494 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > {{ConfigConstants#DEFAULT_PARALLELISM_KEY}} was only partialy migrated to a > ConfigOption in FLINK-4770. > A ConfigOption was createde but the usages weren't replaced. The option is > also called {{DEFAULT_PARALLELISM_KEY}} when it should be called > {{DEFAULT_PARALLELISM}} to be consistent with all other config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6623) BlobCacheSuccessTest fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345171#comment-16345171 ] ASF GitHub Bot commented on FLINK-6623: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5351 > BlobCacheSuccessTest fails on Windows > - > > Key: FLINK-6623 > URL: https://issues.apache.org/jira/browse/FLINK-6623 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0, 1.5.0 > Environment: windows 10, java 1.8 >Reporter: constantine stanley >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > All tests in {{BlobCacheSuccessTest}} fail on Windows. > {code} > java.nio.file.FileSystemException: > C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp- > -> > C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387) > at > sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287) > at java.nio.file.Files.move(Files.java:1395) > at > org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464) > at > org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708) > at > org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608) > at > org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568) > at > org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778) > at > org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173) > at > org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6464) Metric name is not stable
[ https://issues.apache.org/jira/browse/FLINK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6464. --- Resolution: Fixed Fix Version/s: (was: 1.4.1) master: 7d4bd4b6d6710a7e81a7883aee1947a76d564b9a > Metric name is not stable > - > > Key: FLINK-6464 > URL: https://issues.apache.org/jira/browse/FLINK-6464 > Project: Flink > Issue Type: Bug > Components: DataStream API, Metrics >Affects Versions: 1.2.0 >Reporter: Andrey >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > Currently according to the documentation > (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html) > operator metrics constructed using the following pattern: > , > For some operators, "operator_name" could contain default implementation of > toString method. For example: > {code} > TriggerWindow(TumblingProcessingTimeWindows(3000), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@c65792d4}, > xxx.Trigger@665fe457, WindowedStream.apply(WindowedStream.java:521)) -> > Sink: Unnamed > {code} > The part "@c65792d4" will be changed every time job is restarted/cancelled. > As a consequence it's not possible to store metrics for a long time. > Expected: > * ensure all operators return human readable, non-default names OR > * change the way TriggerWindow generates it's name. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6464) Metric name is not stable
[ https://issues.apache.org/jira/browse/FLINK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345169#comment-16345169 ] ASF GitHub Bot commented on FLINK-6464: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5332 > Metric name is not stable > - > > Key: FLINK-6464 > URL: https://issues.apache.org/jira/browse/FLINK-6464 > Project: Flink > Issue Type: Bug > Components: DataStream API, Metrics >Affects Versions: 1.2.0 >Reporter: Andrey >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > Currently according to the documentation > (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html) > operator metrics constructed using the following pattern: > , > For some operators, "operator_name" could contain default implementation of > toString method. For example: > {code} > TriggerWindow(TumblingProcessingTimeWindows(3000), > ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@c65792d4}, > xxx.Trigger@665fe457, WindowedStream.apply(WindowedStream.java:521)) -> > Sink: Unnamed > {code} > The part "@c65792d4" will be changed every time job is restarted/cancelled. > As a consequence it's not possible to store metrics for a long time. > Expected: > * ensure all operators return human readable, non-default names OR > * change the way TriggerWindow generates it's name. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5338: [FLINK-8476][config][HA] Deprecate HA config const...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5338 ---
[jira] [Closed] (FLINK-5659) FileBaseUtils#deleteFileOrDirectory not thread-safe on Windows
[ https://issues.apache.org/jira/browse/FLINK-5659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-5659. --- Resolution: Fixed Fix Version/s: 1.5.0 master: 2e63d5a8ec1fa874a61061b72b970879f14c86d9 > FileBaseUtils#deleteFileOrDirectory not thread-safe on Windows > -- > > Key: FLINK-5659 > URL: https://issues.apache.org/jira/browse/FLINK-5659 > Project: Flink > Issue Type: Bug > Components: Core, Local Runtime >Affects Versions: 1.2.0, 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > The {code}FileBaseUtils#deleteFileOrDirectory{code} is not thread-safe on > Windows. > First you will run into AccessDeniedExceptions since one thread tried to > delete a file while another thread was already doing that, for which the file > has to be opened. > Once you resolve those exceptions (by catching them double checking whether > the file still exists), you run into DirectoryNotEmptyExceptions since there > is some wacky timing/visibility issue when deleting files concurrently. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5377: [FLINK-8494][config] Migrate CC#DEFAULT_PARALLELIS...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5377 ---
[GitHub] flink pull request #5332: [FLINK-6464][streaming] Stabilize default window o...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5332 ---
[jira] [Closed] (FLINK-8476) ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused
[ https://issues.apache.org/jira/browse/FLINK-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8476. --- Resolution: Fixed master: 83eb8e143ef64e56974a334ca96e10c011f4a32c > ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused > -- > > Key: FLINK-8476 > URL: https://issues.apache.org/jira/browse/FLINK-8476 > Project: Flink > Issue Type: Improvement > Components: Configuration >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > {{ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT}} is unused and should probably > be deprecated. > [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5342#discussion_r164766198 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java --- @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; + +// TODO: Make bucket granularity adaptable +/** + * A TwoInputStreamOperator to execute time-bounded stream inner joins. + * + * By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts â [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * As soon as elements are joined they are passed to a user-defined {@link JoinedProcessFunction}, + * as a {@link Tuple2}, with f0 being the left element and f1 being the right element + * + * @param The type of the elements in the left stream + * @param The type of the elements in the right stream + * @param The output type created by the user-defined function + */ +public class TimeBoundedStreamJoinOperator + extends AbstractUdfStreamOperator> + implements TwoInputStreamOperator { + + private final long lowerBound; + private final long upperBound; + + private final long inverseLowerBound; + private final long inverseUpperBound; + + private final boolean lowerBoundInclusive; + private final boolean upperBoundInclusive; + + private final long bucketGranularity = 1; + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT"; + private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT"; + + private transient ValueState lastCleanupRightBuffer; + private transient ValueState lastCleanupLeftBuffer; + + private transient MapState>> leftBuffer; + private transient MapState>> rightBuffer; + + private final TypeSerializer leftTypeSerializer; + private final TypeSerializer rightTypeSerializer; + + private transient TimestampedCollector collector; + + private ContextImpl context; + + /** +* Creates a new TimeBoundedStreamJoinOperator. +* +* @param lowerBound The lower bound for evaluating if elements should be joined +* @param upperBound The upper bound for evaluating if elements should be joined +* @param lowerBoundInclusive Whether or not to include elements where the timestamp matches +
[jira] [Closed] (FLINK-8494) Migrate CC#DEFAULT_PARALLELISM_KEY
[ https://issues.apache.org/jira/browse/FLINK-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8494. --- Resolution: Fixed master: 0e20b613087e1b326e05674e3d532ea4aa444bc3 > Migrate CC#DEFAULT_PARALLELISM_KEY > -- > > Key: FLINK-8494 > URL: https://issues.apache.org/jira/browse/FLINK-8494 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > {{ConfigConstants#DEFAULT_PARALLELISM_KEY}} was only partialy migrated to a > ConfigOption in FLINK-4770. > A ConfigOption was createde but the usages weren't replaced. The option is > also called {{DEFAULT_PARALLELISM_KEY}} when it should be called > {{DEFAULT_PARALLELISM}} to be consistent with all other config options. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden
[ https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345158#comment-16345158 ] ASF GitHub Bot commented on FLINK-8407: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5369 That makes sense to me. I just wonder what do you mean by "add a test for the Java API"... > Setting the parallelism after a partitioning operation should be forbidden > -- > > Key: FLINK-8407 > URL: https://issues.apache.org/jira/browse/FLINK-8407 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Major > > Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} > create new {{DataStreams}}, which allow the users to set parallelisms for > them. However, the {{PartitionTransformations}} in these returned > {{DataStreams}} will only add virtual nodes, whose parallelisms could not be > specified, in the execution graph. We should forbid users to set the > parallelism after a partitioning operation since they won't actually work. > Also the corresponding documents should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables
[ https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345155#comment-16345155 ] ASF GitHub Bot commented on FLINK-8475: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5384 More reason for the more advanced configuration stuff where you can actually specify such things directly in the configuration code. 😅 Changes look good now, though. 👍 > Move remaining sections to generated tables > --- > > Key: FLINK-8475 > URL: https://issues.apache.org/jira/browse/FLINK-8475 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5369 That makes sense to me. I just wonder what do you mean by "add a test for the Java API"... ---