[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846626#comment-17846626 ] Shuai Xu edited comment on FLINK-34380 at 5/16/24 1:58 AM: --- Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guarantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guaranteed on a node by using LinkedHashMap, however, it could not be guaranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. was (Author: JIRAUSER300096): Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guanrantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guanranteed on a node by using LinkedHashMap, however, it could not be guranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846626#comment-17846626 ] Shuai Xu commented on FLINK-34380: -- Hi [~rovboyko] , sorry for late reply. For the incorrect order of output records, the minibatch optimization is designed to guanrantee final consistency. And the fix you mentioned has been considered when the pr was reviewed. Flink is a distributed realtime processing system. The order of output could be guanranteed on a node by using LinkedHashMap, however, it could not be guranteed when join operator runs on multiple nodes. So I think it makes little sense to keep the order here. For the Rowkind, it was also reviewed. As you mentioned, it is a common problem of MiniBatch functionality. It does not influence final result. From the benefit perspective, this problem could be tolerable. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35230) Split FlinkSqlParserImplTest to reduce the code lines.
[ https://issues.apache.org/jira/browse/FLINK-35230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840623#comment-17840623 ] Shuai Xu commented on FLINK-35230: -- [~lsy] I'd like to take this, would you assign it to me? > Split FlinkSqlParserImplTest to reduce the code lines. > -- > > Key: FLINK-35230 > URL: https://issues.apache.org/jira/browse/FLINK-35230 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / API >Reporter: Feng Jin >Priority: Major > > With the increasing extension of Calcite syntax, the current > FlinkSqlParserImplTest has reached nearly 3000 lines of code. > If it exceeds the current limit, it will result in errors in the code style > check. > {code:java} > Unable to find source-code formatter for language: log. Available languages > are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, > groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, > perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, > yaml08:33:19.679 [ERROR] > src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:[1] > (sizes) FileLength: File length is 3,166 lines (max allowed is 3,100). > {code} > To facilitate future syntax extends, I suggest that we split > FlinkSqlParserImplTest and place the same type of syntax in separate Java > tests for the convenience of avoiding the continuous growth of the original > test class. > My current idea is: > Since *FlinkSqlParserImplTest* currently inherits {*}SqlParserTest{*}, and > *SqlParserTest* itself contains many unit tests, for the convenience of > future test splits, we should introduce a basic *ParserTestBase* inheriting > {*}SqlParserTest{*}, and disable the original related unit tests in > {*}SqlParserTest{*}. > This will facilitate writing relevant unit tests more quickly during > subsequent splitting, without the need to repeatedly execute the unit tests > inside SqlParserTest. > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59113&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839947#comment-17839947 ] Shuai Xu commented on FLINK-35184: -- Absolutely, please feel free to start the implementation. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839947#comment-17839947 ] Shuai Xu edited comment on FLINK-35184 at 4/23/24 5:46 AM: --- [~rovboyko] , absolutely, please feel free to start the implementation. was (Author: JIRAUSER300096): Absolutely, please feel free to start the implementation. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839920#comment-17839920 ] Shuai Xu commented on FLINK-35184: -- Hi [~rovboyko] , actually it can't be avoid hash collision even if using BinaryRowData which can only reduce the probability to some extent. And the solution you mentioned works for me. > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator
[ https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839464#comment-17839464 ] Shuai Xu commented on FLINK-35184: -- Hi [~rovboyko] , thx for reporting this bug which is caused by the hashcode() in GenericRowData. Could you please give a rough explanation of your solutions before implementing it? > Hash collision inside MiniBatchStreamingJoin operator > - > > Key: FLINK-35184 > URL: https://issues.apache.org/jira/browse/FLINK-35184 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Roman Boyko >Priority: Major > > The hash collision is possible for InputSideHasNoUniqueKeyBundle. To > reproduce it just launch the following test within > StreamingMiniBatchJoinOperatorTest: > > {code:java} > @Tag("miniBatchSize=6") > @Test > public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) > throws Exception { > leftTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id1", "val1"})); > rightTypeInfo = > InternalTypeInfo.of( > RowType.of( > new LogicalType[] {new IntType(), new > BigIntType()}, > new String[] {"id2", "val2"})); > leftKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > leftTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > rightKeySelector = > HandwrittenSelectorUtil.getRowDataSelector( > new int[] {0}, > rightTypeInfo.toRowType().getChildren().toArray(new > LogicalType[0])); > joinKeyTypeInfo = InternalTypeInfo.of(new IntType()); > super.beforeEach(testInfo); > testHarness.setStateTtlProcessingTime(1); > testHarness.processElement2(insertRecord(1, 1L)); > testHarness.processElement1(insertRecord(1, 4294967296L)); > testHarness.processElement2(insertRecord(1, 4294967296L)); > testHarness.processElement2(deleteRecord(1, 1L)); > testHarness.close(); > assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, > 4294967296L, 1, 4294967296L)); > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34694) Delete num of associations for streaming outer join
[ https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835668#comment-17835668 ] Shuai Xu edited comment on FLINK-34694 at 4/10/24 9:43 AM: --- Hi [~rovboyko], The method `otherRecordHasNoAssociationsInInputSide` in your code would be invoked for every associatedRecord. This indeed increases the overhead of state access. It is difficult to say which one has a greater proportion between the increased costs and the reduced expenses of the method 'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the data distribution itself. So a detailed test report could better illustrate the problem. And a comparison table that covers JOIN keyword in queries of nexmark is good. Besides, rewrite sql for hitting this optimization can also indicate the scenarios in which this optimization takes effect. was (Author: JIRAUSER300096): Hi [~rovboyko], The method `otherRecordHasNoAssociationsInInputSide` in your code would be invoked for every associatedRecord. This indeed increases the overhead of state access. It is difficult to say which one has a greater proportion between the increased costs and the reduced expenses of the method 'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the data distribution itself. So a detailed test report could better illustrate the problem. And a comparison table that covers JOIN keyword in queries of nexmark is good. Besides this, rewrite sql for hitting this optimization can also indicate the scenarios in which this optimization takes effect. > Delete num of associations for streaming outer join > --- > > Key: FLINK-34694 > URL: https://issues.apache.org/jira/browse/FLINK-34694 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Roman Boyko >Priority: Major > Attachments: image-2024-03-15-19-51-29-282.png, > image-2024-03-15-19-52-24-391.png > > > Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the > OuterJoinRecordStateView is used to store additional field - the number of > associations for every record. This leads to store additional Tuple2 and > Integer data for every record in outer state. > This functionality is used only for sending: > * -D[nullPaddingRecord] in case of first Accumulate record > * +I[nullPaddingRecord] in case of last Revoke record > The overhead of storing additional data and updating the counter for > associations can be avoided by checking the input state for these events. > > The proposed solution can be found here - > [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423] > > According to the nexmark q20 test (changed to OUTER JOIN) it could increase > the performance up to 20%: > * Before: > !image-2024-03-15-19-52-24-391.png! > * After: > !image-2024-03-15-19-51-29-282.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join
[ https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835668#comment-17835668 ] Shuai Xu commented on FLINK-34694: -- Hi [~rovboyko], The method `otherRecordHasNoAssociationsInInputSide` in your code would be invoked for every associatedRecord. This indeed increases the overhead of state access. It is difficult to say which one has a greater proportion between the increased costs and the reduced expenses of the method 'updateNumOfAssociations()' you mentioned. Intuitively, this may depend on the data distribution itself. So a detailed test report could better illustrate the problem. And a comparison table that covers JOIN keyword in queries of nexmark is good. Besides this, rewrite sql for hitting this optimization can also indicate the scenarios in which this optimization takes effect. > Delete num of associations for streaming outer join > --- > > Key: FLINK-34694 > URL: https://issues.apache.org/jira/browse/FLINK-34694 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Roman Boyko >Priority: Major > Attachments: image-2024-03-15-19-51-29-282.png, > image-2024-03-15-19-52-24-391.png > > > Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the > OuterJoinRecordStateView is used to store additional field - the number of > associations for every record. This leads to store additional Tuple2 and > Integer data for every record in outer state. > This functionality is used only for sending: > * -D[nullPaddingRecord] in case of first Accumulate record > * +I[nullPaddingRecord] in case of last Revoke record > The overhead of storing additional data and updating the counter for > associations can be avoided by checking the input state for these events. > > The proposed solution can be found here - > [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423] > > According to the nexmark q20 test (changed to OUTER JOIN) it could increase > the performance up to 20%: > * Before: > !image-2024-03-15-19-52-24-391.png! > * After: > !image-2024-03-15-19-51-29-282.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34694) Delete num of associations for streaming outer join
[ https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835648#comment-17835648 ] Shuai Xu commented on FLINK-34694: -- Hi [~rovboyko] , your idea looks interesting. Actually I found that this optimization does not reduce the overhead of state access after reading your code rather reduces the state to some extent . IMO, the marginal reduction in size may not significantly impact the overhead of storage, given that it constitutes a small fraction relative to the records held in the state. BTW, if you plan to pursue this optimization further, could you provide more comprehensive benchmark details? The benchmark results of multiple tests and overall performance of all queries are convincing. > Delete num of associations for streaming outer join > --- > > Key: FLINK-34694 > URL: https://issues.apache.org/jira/browse/FLINK-34694 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Roman Boyko >Priority: Major > Attachments: image-2024-03-15-19-51-29-282.png, > image-2024-03-15-19-52-24-391.png > > > Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the > OuterJoinRecordStateView is used to store additional field - the number of > associations for every record. This leads to store additional Tuple2 and > Integer data for every record in outer state. > This functionality is used only for sending: > * -D[nullPaddingRecord] in case of first Accumulate record > * +I[nullPaddingRecord] in case of last Revoke record > The overhead of storing additional data and updating the counter for > associations can be avoided by checking the input state for these events. > > The proposed solution can be found here - > [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423] > > According to the nexmark q20 test (changed to OUTER JOIN) it could increase > the performance up to 20%: > * Before: > !image-2024-03-15-19-52-24-391.png! > * After: > !image-2024-03-15-19-51-29-282.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822433#comment-17822433 ] Shuai Xu commented on FLINK-34380: -- Let me take a look. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources
[ https://issues.apache.org/jira/browse/FLINK-34500 ] Shuai Xu deleted comment on FLINK-34500: -- was (Author: JIRAUSER300096): Hi, Could I take this verification? > Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL > Sources > - > > Key: FLINK-34500 > URL: https://issues.apache.org/jira/browse/FLINK-34500 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Parent, Table SQL / API >Affects Versions: 1.19.0 >Reporter: SuDewei >Assignee: Yun Tang >Priority: Blocker > Fix For: 1.19.0 > > > This issue aims to verify > [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. > Volunteers can verify it by following the [doc > changes|https://github.com/apache/flink/pull/24234]. Since currently only the > pre-defined DataGen connector and user-defined connector supports setting > source parallelism, volunteers can verify it through DataGen Connector. > The basic steps include: > 1. Start a Flink cluster and submit a Flink SQL Job to the cluster. > 2. In this Flink Job, use the DataGen SQL Connector to generate data. > 3. Specify the parameter scan.parallelism in DataGen connector options as > user-defined parallelism instead of default parallelism. > 4. Observe whether the parallelism of the source has changed on the job graph > of the Flink Application UI, and whether the shuffle mode is correct. > If everything is normal, you will see that the parallelism of the source > operator is indeed different from that of downstream, and the shuffle mode is > rebalanced by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34500) Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL Sources
[ https://issues.apache.org/jira/browse/FLINK-34500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819964#comment-17819964 ] Shuai Xu commented on FLINK-34500: -- Hi, Could I take this verification? > Release Testing: Verify FLINK-33261 Support Setting Parallelism for Table/SQL > Sources > - > > Key: FLINK-34500 > URL: https://issues.apache.org/jira/browse/FLINK-34500 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Parent, Table SQL / API >Affects Versions: 1.19.0 >Reporter: SuDewei >Assignee: Yun Tang >Priority: Blocker > Fix For: 1.19.0 > > > This issue aims to verify > [FLIP-367|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429150]. > Volunteers can verify it by following the [doc > changes|https://github.com/apache/flink/pull/24234]. Since currently only the > pre-defined DataGen connector and user-defined connector supports setting > source parallelism, volunteers can verify it through DataGen Connector. > The basic steps include: > 1. Start a Flink cluster and submit a Flink SQL Job to the cluster. > 2. In this Flink Job, use the DataGen SQL Connector to generate data. > 3. Specify the parameter scan.parallelism in DataGen connector options as > user-defined parallelism instead of default parallelism. > 4. Observe whether the parallelism of the source has changed on the job graph > of the Flink Application UI, and whether the shuffle mode is correct. > If everything is normal, you will see that the parallelism of the source > operator is indeed different from that of downstream, and the shuffle mode is > rebalanced by default. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34219) Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34219: - Release Note: Support minibatch regular join to reduce intermediate result and resolve record amplification in cascading join scenarios. > Introduce a new join operator to support minibatch > -- > > Key: FLINK-34219 > URL: https://issues.apache.org/jira/browse/FLINK-34219 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime > Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Major > Fix For: 1.19.0 > > > This is the parent task of FLIP-415. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818612#comment-17818612 ] Shuai Xu commented on FLINK-34346: -- Hi, I have finished this testing. The exception I think could be improved has been linked to this jira. > Release Testing: Verify FLINK-24024 Support session Window TVF > -- > > Key: FLINK-34346 > URL: https://issues.apache.org/jira/browse/FLINK-34346 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Session window TVF is ready. Users can use Session window TVF aggregation > instead of using legacy session group window aggregation. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24250]) although it is still > being reviewed. > Further more, although session window join, session window rank and session > window deduplicate are in experimental state, If someone finds some bugs > about them, you could also open a Jira linked this one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818611#comment-17818611 ] Shuai Xu commented on FLINK-34355: -- Hi, I have finished this verification. > Release Testing: Verify FLINK-34054 Support named parameters for functions > and procedures > - > > Key: FLINK-34355 > URL: https://issues.apache.org/jira/browse/FLINK-34355 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Test suggestion: > 1. Implement a test UDF or Procedure and support Named Parameters. > 2. When calling a function or procedure, use named parameters to verify if > the results are as expected. > You can test the following scenarios: > 1. Normal usage of named parameters, fully specifying each parameter. > 2. Omitting unnecessary parameters. > 3. Omitting necessary parameters to confirm if an error is reported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34462) Session window with negative parameter throws unclear exception
Shuai Xu created FLINK-34462: Summary: Session window with negative parameter throws unclear exception Key: FLINK-34462 URL: https://issues.apache.org/jira/browse/FLINK-34462 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: Shuai Xu Set invalid parameter in session window get unclear error. {code:java} // add test in WindowAggregateITCase def testEventTimeSessionWindowWithInvalidName(): Unit = { val sql = """ |SELECT | window_start, | window_end, | COUNT(*), | SUM(`bigdec`), | MAX(`double`), | MIN(`float`), | COUNT(DISTINCT `string`), | concat_distinct_agg(`string`) |FROM TABLE( | SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '-5' SECOND)) |GROUP BY window_start, window_end """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toDataStream.addSink(sink) env.execute() } {code} {code:java} java.lang.AssertionError: Sql optimization: Assertion error: null at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:178) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:151) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:128) at org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:60) at org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase.testEventTimeSessionWindowWithInvalidName(WindowAggregateITCase.scala:1239) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at ja
[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818384#comment-17818384 ] Shuai Xu commented on FLINK-34378: -- Hi [~xuyangzhong] . This is an expected behavior. To maintain order, additional data structures would need to be introduced, which would result in a performance degradation and the ordered effect would only materialize when parallelism is set to 1. If order preservation is required with a parallelism of 1, it suffices to simply turn off the minibatch feature. > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} > Result > {code:java} > ++---+---+---+---+ > | op | a | b | a0| b0| > ++---+---+---+---+ > | +I | 3 | 3 | 3 | 3 | > | +I | 7 | 7 | 7 | 7 | > | +I | 2 | 2 | 2 | 2 | > | +I | 5 | 5 | 5 | 5 | > | +I | 1 | 1 | 1 | 1 | > | +I | 6 | 6 | 6 | 6 | > | +I | 4 | 4 | 4 | 4 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+---+---+ > {code} > When I do not use minibatch join, the result is : > {code:java} > ++---+---+++ > | op | a | b | a0 | b0 | > ++---+---+++ > | +I | 1 | 1 | 1 | 1 | > | +I | 2 | 2 | 2 | 2 | > | +I | 3 | 3 | 3 | 3 | > | +I | 4 | 4 | 4 | 4 | > | +I | 5 | 5 | 5 | 5 | > | +I | 6 | 6 | 6 | 6 | > | +I | 7 | 7 | 7 | 7 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+++ > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814193#comment-17814193 ] Shuai Xu edited comment on FLINK-34355 at 2/5/24 7:19 AM: -- Hi, I'd like to take this verification. cc [~hackergin] . was (Author: JIRAUSER300096): Hi, I'd like to take this verification. > Release Testing: Verify FLINK-34054 Support named parameters for functions > and procedures > - > > Key: FLINK-34355 > URL: https://issues.apache.org/jira/browse/FLINK-34355 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Test suggestion: > 1. Implement a test UDF or Procedure and support Named Parameters. > 2. When calling a function or procedure, use named parameters to verify if > the results are as expected. > You can test the following scenarios: > 1. Normal usage of named parameters, fully specifying each parameter. > 2. Omitting unnecessary parameters. > 3. Omitting necessary parameters to confirm if an error is reported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34355) Release Testing: Verify FLINK-34054 Support named parameters for functions and procedures
[ https://issues.apache.org/jira/browse/FLINK-34355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814193#comment-17814193 ] Shuai Xu commented on FLINK-34355: -- Hi, I'd like to take this verification. > Release Testing: Verify FLINK-34054 Support named parameters for functions > and procedures > - > > Key: FLINK-34355 > URL: https://issues.apache.org/jira/browse/FLINK-34355 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Test suggestion: > 1. Implement a test UDF or Procedure and support Named Parameters. > 2. When calling a function or procedure, use named parameters to verify if > the results are as expected. > You can test the following scenarios: > 1. Normal usage of named parameters, fully specifying each parameter. > 2. Omitting unnecessary parameters. > 3. Omitting necessary parameters to confirm if an error is reported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814192#comment-17814192 ] Shuai Xu commented on FLINK-34304: -- This test is opened in another issue FLINK-34349 and this issue would be closed. > Release Testing Instructions: Verify FLINK-34219 Introduce a new join > operator to support minibatch > --- > > Key: FLINK-34304 > URL: https://issues.apache.org/jira/browse/FLINK-34304 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34304 ] Shuai Xu deleted comment on FLINK-34304: -- was (Author: JIRAUSER300096): Minibatch join is ready. Users could improve performance in regular stream join scenarios. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24240)] although it is still being reviewed. If someone finds some bugs about this feature, you open a Jira linked this one to report them. > Release Testing Instructions: Verify FLINK-34219 Introduce a new join > operator to support minibatch > --- > > Key: FLINK-34304 > URL: https://issues.apache.org/jira/browse/FLINK-34304 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34349: - Description: Minibatch join is ready. Users could improve performance in regular stream join scenarios. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24240)] although it is still being reviewed. If someone finds some bugs about this feature, you open a Jira linked this one to report them. > Release Testing: Verify FLINK-34219 Introduce a new join operator to support > minibatch > -- > > Key: FLINK-34349 > URL: https://issues.apache.org/jira/browse/FLINK-34349 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Minibatch join is ready. Users could improve performance in regular stream > join scenarios. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24240)] although it is still > being reviewed. > If someone finds some bugs about this feature, you open a Jira linked this > one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
Shuai Xu created FLINK-34349: Summary: Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch Key: FLINK-34349 URL: https://issues.apache.org/jira/browse/FLINK-34349 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Affects Versions: 1.19.0 Reporter: Shuai Xu Assignee: Shuai Xu Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814016#comment-17814016 ] Shuai Xu commented on FLINK-34300: -- Hi, [~xuyangzhong]. I'd like to take this verification. > Release Testing Instructions: Verify FLINK-24024 Support session Window TVF > --- > > Key: FLINK-34300 > URL: https://issues.apache.org/jira/browse/FLINK-34300 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34304) Release Testing Instructions: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814015#comment-17814015 ] Shuai Xu commented on FLINK-34304: -- Minibatch join is ready. Users could improve performance in regular stream join scenarios. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24240)] although it is still being reviewed. If someone finds some bugs about this feature, you open a Jira linked this one to report them. > Release Testing Instructions: Verify FLINK-34219 Introduce a new join > operator to support minibatch > --- > > Key: FLINK-34304 > URL: https://issues.apache.org/jira/browse/FLINK-34304 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34222) Supports mini-batch for streaming regular join
[ https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34222: - Summary: Supports mini-batch for streaming regular join (was: End to end implementation of minibatch join) > Supports mini-batch for streaming regular join > -- > > Key: FLINK-34222 > URL: https://issues.apache.org/jira/browse/FLINK-34222 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime > Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Implement minibatch join in E2E which includes both plan and runtime parts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34222) End to end implementation of minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34222: - Description: Implement minibatch join in E2E which includes both plan and runtime parts. (was: Get minibatch join operator involved in which includes both plan and operator. Implement minibatch join in E2E.) Summary: End to end implementation of minibatch join (was: Get minibatch join operator involved) > End to end implementation of minibatch join > --- > > Key: FLINK-34222 > URL: https://issues.apache.org/jira/browse/FLINK-34222 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime > Reporter: Shuai Xu >Assignee: Shuai Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Implement minibatch join in E2E which includes both plan and runtime parts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34256) Add a documentation section for minibatch join
Shuai Xu created FLINK-34256: Summary: Add a documentation section for minibatch join Key: FLINK-34256 URL: https://issues.apache.org/jira/browse/FLINK-34256 Project: Flink Issue Type: Sub-task Components: Documentation Affects Versions: 1.19.0 Reporter: Shuai Xu We should add a minibatch join section in Performance Tuning to explain the usage and principle of minibatch-join. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34222) Get minibatch join operator involved
[ https://issues.apache.org/jira/browse/FLINK-34222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-34222: - Description: Get minibatch join operator involved in which includes both plan and operator. Implement minibatch join in E2E. (was: Get minibatch join operator involved which includes both plan and operator) > Get minibatch join operator involved > > > Key: FLINK-34222 > URL: https://issues.apache.org/jira/browse/FLINK-34222 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime > Reporter: Shuai Xu >Priority: Major > Fix For: 1.19.0 > > > Get minibatch join operator involved in which includes both plan and > operator. Implement minibatch join in E2E. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34222) Get minibatch join operator involved
Shuai Xu created FLINK-34222: Summary: Get minibatch join operator involved Key: FLINK-34222 URL: https://issues.apache.org/jira/browse/FLINK-34222 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 Get minibatch join operator involved which includes both plan and operator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34221) Introduce operator for minibatch join
Shuai Xu created FLINK-34221: Summary: Introduce operator for minibatch join Key: FLINK-34221 URL: https://issues.apache.org/jira/browse/FLINK-34221 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 Introduce operator that implements minibatch join -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34220) introduce buffer bundle for minibatch join
Shuai Xu created FLINK-34220: Summary: introduce buffer bundle for minibatch join Key: FLINK-34220 URL: https://issues.apache.org/jira/browse/FLINK-34220 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 introduce buffer bundle for storing records to implement minibatch join -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34219) Introduce a new join operator to support minibatch
Shuai Xu created FLINK-34219: Summary: Introduce a new join operator to support minibatch Key: FLINK-34219 URL: https://issues.apache.org/jira/browse/FLINK-34219 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Reporter: Shuai Xu Fix For: 1.19.0 This is the parent task of FLIP-415. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enabling local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Summary: JsonObjectAggFunction can't retract previous data which is invalid when enabling local global agg (was: JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg) > JsonObjectAggFunction can't retract previous data which is invalid when > enabling local global agg > - > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal in sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Summary: JsonObjectAggFunction can't retract previous data which is invalid when enable local global agg (was: jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg) > JsonObjectAggFunction can't retract previous data which is invalid when > enable local global agg > --- > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal in sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Description: Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. was: Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} //代码占位符 def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} //代码占位符 List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. > jsonObjectAggFunction can't retract previous data which is invalid when > enable local global agg > --- > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal and minibatch in > sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
[ https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-33689: - Description: Run the test as following and enable LocalGlobal in sql/AggregateITCase . {code:java} def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. was: Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. > jsonObjectAggFunction can't retract previous data which is invalid when > enable local global agg > --- > > Key: FLINK-33689 > URL: https://issues.apache.org/jira/browse/FLINK-33689 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Shuai Xu >Priority: Major > Labels: pull-request-available > > Run the test as following and enable LocalGlobal in sql/AggregateITCase . > {code:java} > def testGroupJsonObjectAggWithRetract(): Unit = { > val data = new mutable.MutableList[(Long, String, Long)] > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > data.+=((2L, "Hallo", 2L)) > val sql = > s""" >|select >| JSON_OBJECTAGG(key k value v) >|from (select >| cast(SUM(a) as string) as k,t as v >| from >| Table6 >| group by t) >|""".stripMargin > val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) > tEnv.createTemporaryView("Table6", t) > val sink = new TestingRetractSink > tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) > env.execute() > val expected = > List( > "{\"30\":2}" > ) > assertThat(sink.getRetractResults).isEqualTo(expected) > } {code} > The result is as following. > {code:java} > List({"14":2,"30":2}) {code} > However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33689) jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg
Shuai Xu created FLINK-33689: Summary: jsonObjectAggFunction can't retract previous data which is invalid when enable local global agg Key: FLINK-33689 URL: https://issues.apache.org/jira/browse/FLINK-33689 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.18.0 Reporter: Shuai Xu Run the test as following and enable LocalGlobal and minibatch in sql/AggregateITCase . {code:java} //代码占位符 def testGroupJsonObjectAggWithRetract(): Unit = { val data = new mutable.MutableList[(Long, String, Long)] data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) data.+=((2L, "Hallo", 2L)) val sql = s""" |select | JSON_OBJECTAGG(key k value v) |from (select | cast(SUM(a) as string) as k,t as v | from | Table6 | group by t) |""".stripMargin val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't) tEnv.createTemporaryView("Table6", t) val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() val expected = List( "{\"30\":2}" ) assertThat(sink.getRetractResults).isEqualTo(expected) } {code} The result is as following. {code:java} //代码占位符 List({"14":2,"30":2}) {code} However, \{"14":2} should be retracted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode
[ https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788967#comment-17788967 ] Shuai Xu edited comment on FLINK-33549 at 11/23/23 3:39 AM: Hi [~luoyuxia] , I'd like to fix this. Could you assign it to me? was (Author: JIRAUSER300096): I'd like to fix this. Could you assign it to me? [~luoyuxia] > Exception "Factory does not implement interface YieldingOperatorFactory" > thrown in batch mode > -- > > Key: FLINK-33549 > URL: https://issues.apache.org/jira/browse/FLINK-33549 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: luoyuxia >Priority: Major > > When run a job in batch, it throws the following exception > {code:java} > java.lang.NullPointerException: Factory does not implement interface > org.apache.flink.streaming.api.operators.YieldingOperatorFactory > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67) > at > org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79) > at > org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:834) {code} > > When I disable multiple-input by setting > table.optimizer.multiple-input-enabled = false, it works then. > Should be introduced by FLINK-23621. > [In > here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60], > when the operator factory is instanceof YieldingOperatorFactory, it will set > mailbox executor. But when it's > BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox > executor but it won't set the mailbox executor. for the operators wrapped by > the BatchMultipleInputStreamOperator. Then the exception is thrown. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33549) Exception "Factory does not implement interface YieldingOperatorFactory" thrown in batch mode
[ https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788967#comment-17788967 ] Shuai Xu commented on FLINK-33549: -- I'd like to fix this. Could you assign it to me? [~luoyuxia] > Exception "Factory does not implement interface YieldingOperatorFactory" > thrown in batch mode > -- > > Key: FLINK-33549 > URL: https://issues.apache.org/jira/browse/FLINK-33549 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Reporter: luoyuxia >Priority: Major > > When run a job in batch, it throws the following exception > {code:java} > java.lang.NullPointerException: Factory does not implement interface > org.apache.flink.streaming.api.operators.YieldingOperatorFactory > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67) > at > org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79) > at > org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259) > at > org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.(MultipleInputStreamOperatorBase.java:88) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.(BatchMultipleInputStreamOperator.java:48) > at > org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:212) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) > at java.lang.Thread.run(Thread.java:834) {code} > > When I disable multiple-input by setting > table.optimizer.multiple-input-enabled = false, it works then. > Should be introduced by FLINK-23621. > [In > here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60], > when the operator factory is instanceof YieldingOperatorFactory, it will set > mailbox executor. But when it's > BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox > executor but it won't set the mailbox executor. for the operators wrapped by > the BatchMultipleInputStreamOperator. Then the exception is thrown. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28650) Flink SQL Parsing bug for METADATA
[ https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775580#comment-17775580 ] Shuai Xu commented on FLINK-28650: -- The bug in partial insert with writable metadata is fixed in FLINK-30922. > Flink SQL Parsing bug for METADATA > -- > > Key: FLINK-28650 > URL: https://issues.apache.org/jira/browse/FLINK-28650 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: Jun Qin >Priority: Major > Fix For: 1.19.0 > > > With the following source/sink tables: > {code:sql} > CREATE TABLE sourceTable ( > `key` INT, > `time` TIMESTAMP(3), > `value` STRING NOT NULL, > id INT > ) > WITH ( > 'connector' = 'datagen', > 'rows-per-second'='10', > 'fields.id.kind'='sequence', > 'fields.id.start'='1', > 'fields.id.end'='100' > ); > CREATE TABLE sinkTable1 ( > `time` TIMESTAMP(3) METADATA FROM 'timestamp', > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > CREATE TABLE sinkTable2 ( > `time` TIMESTAMP(3),-- without METADATA > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > {code} > the following three pass the validation: > {code:sql} > INSERT INTO sinkTable1 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > but this one does not: > {code:sql} > INSERT INTO sinkTable1 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > It failed with > {code:java} > Unknown target column 'time' > {code} > It seems when providing column names in INSERT, the METADATA have an > undesired effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28650) Flink SQL Parsing bug for METADATA
[ https://issues.apache.org/jira/browse/FLINK-28650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775580#comment-17775580 ] Shuai Xu edited comment on FLINK-28650 at 10/16/23 7:58 AM: This bug is fixed in FLINK-30922. was (Author: JIRAUSER300096): The bug in partial insert with writable metadata is fixed in FLINK-30922. > Flink SQL Parsing bug for METADATA > -- > > Key: FLINK-28650 > URL: https://issues.apache.org/jira/browse/FLINK-28650 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.4 >Reporter: Jun Qin >Priority: Major > Fix For: 1.19.0 > > > With the following source/sink tables: > {code:sql} > CREATE TABLE sourceTable ( > `key` INT, > `time` TIMESTAMP(3), > `value` STRING NOT NULL, > id INT > ) > WITH ( > 'connector' = 'datagen', > 'rows-per-second'='10', > 'fields.id.kind'='sequence', > 'fields.id.start'='1', > 'fields.id.end'='100' > ); > CREATE TABLE sinkTable1 ( > `time` TIMESTAMP(3) METADATA FROM 'timestamp', > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > CREATE TABLE sinkTable2 ( > `time` TIMESTAMP(3),-- without METADATA > `value` STRING NOT NULL > ) > WITH ( > 'connector' = 'kafka', > ... > ) > {code} > the following three pass the validation: > {code:sql} > INSERT INTO sinkTable1 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 > SELECT > `time`, > `value` > FROM sourceTable; > INSERT INTO sinkTable2 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > but this one does not: > {code:sql} > INSERT INTO sinkTable1 (`time`,`value`) > SELECT > `time`, > `value` > FROM sourceTable; > {code} > It failed with > {code:java} > Unknown target column 'time' > {code} > It seems when providing column names in INSERT, the METADATA have an > undesired effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28866) Use DDL instead of legacy method to register the test source in JoinITCase
[ https://issues.apache.org/jira/browse/FLINK-28866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17757169#comment-17757169 ] Shuai Xu commented on FLINK-28866: -- Hi, I would like to fix this issue. Could it be assigned to me? > Use DDL instead of legacy method to register the test source in JoinITCase > -- > > Key: FLINK-28866 > URL: https://issues.apache.org/jira/browse/FLINK-28866 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: godfrey he >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756785#comment-17756785 ] Shuai Xu edited comment on FLINK-27741 at 8/21/23 9:58 AM: --- Hi [~chenzihao] , it appears that your pull request has not been approved. Would you like to continue working on it? If not, I would be happy to fix it. was (Author: JIRAUSER300096): Hi, I would like to fix this issue. Could it be assigned to me, please? > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.
[jira] [Commented] (FLINK-27741) Fix NPE when use dense_rank() and rank() in over aggregation
[ https://issues.apache.org/jira/browse/FLINK-27741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756785#comment-17756785 ] Shuai Xu commented on FLINK-27741: -- Hi, I would like to fix this issue. Could it be assigned to me, please? > Fix NPE when use dense_rank() and rank() in over aggregation > > > Key: FLINK-27741 > URL: https://issues.apache.org/jira/browse/FLINK-27741 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.16.0 >Reporter: chenzihao >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > There has an 'NullPointException' when use RANK() and DENSE_RANK() in over > window. > {code:java} > @Test > def testDenseRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, DENSE_RANK() OVER (PARTITION BY a ORDER BY > proctime) FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > {code:java} > @Test > def testRankOnOver(): Unit = { > val t = failingDataSource(TestData.tupleData5) > .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) > tEnv.registerTable("MyTable", t) > val sqlQuery = "SELECT a, RANK() OVER (PARTITION BY a ORDER BY proctime) > FROM MyTable" > val sink = new TestingAppendSink > tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) > env.execute() > } > {code} > Exception Info: > {code:java} > java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:248) > at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:248) > at scala.collection.SeqLike.size(SeqLike.scala:104) > at scala.collection.SeqLike.size$(SeqLike.scala:104) > at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:242) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap(IndexedSeqLike.scala:95) > at > scala.collection.IndexedSeqLike.sizeHintIfCheap$(IndexedSeqLike.scala:95) > at > scala.collection.mutable.ArrayOps$ofInt.sizeHintIfCheap(ArrayOps.scala:242) > at scala.collection.mutable.Builder.sizeHint(Builder.scala:77) > at scala.collection.mutable.Builder.sizeHint$(Builder.scala:76) > at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:21) > at scala.collection.TraversableLike.builder$1(TraversableLike.scala:229) > at scala.collection.TraversableLike.map(TraversableLike.scala:232) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createDenseRankAggFunction(AggFunctionFactory.scala:454) > at > org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:94) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:445) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToAggregateInfoList(AggregateUtil.scala:435) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:381) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil$.transformToStreamAggregateInfoList(AggregateUtil.scala:361) > at > org.apache.flink.table.planner.plan.utils.AggregateUtil.transformToStreamAggregateInfoList(AggregateUtil.scala) > at > org.apache.flink.table.planner.plan.
[jira] [Commented] (FLINK-25054) Improve exception message for unsupported hashLength for SHA2 function
[ https://issues.apache.org/jira/browse/FLINK-25054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17756784#comment-17756784 ] Shuai Xu commented on FLINK-25054: -- Hi, I would like to fix this issue. Could it be assigned to me, please? > Improve exception message for unsupported hashLength for SHA2 function > -- > > Key: FLINK-25054 > URL: https://issues.apache.org/jira/browse/FLINK-25054 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.12.3 >Reporter: DingGeGe >Priority: Major > Attachments: image-2021-11-25-16-59-56-699.png > > Original Estimate: 1h > Remaining Estimate: 1h > > 【exception sql】 > SELECT > SHA2(, 128) > FROM > > 【effect】 > when sql is long , it`s hard to clear where is the problem on this issue > 【reason】 > build-in function SHA2, hashLength do not support “128”, but I could not > understand from > 【Exception log】 > !image-2021-11-25-16-59-56-699.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32824) Port Calcite's fix for the sql like operator
[ https://issues.apache.org/jira/browse/FLINK-32824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752618#comment-17752618 ] Shuai Xu commented on FLINK-32824: -- Hi, lincoln. I‘d like to fix this issue, could you assign it to me? > Port Calcite's fix for the sql like operator > > > Key: FLINK-32824 > URL: https://issues.apache.org/jira/browse/FLINK-32824 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.0, 1.17.1 >Reporter: lincoln lee >Priority: Major > Fix For: 1.19.0 > > > we should port the bugfix of sql like operator > https://issues.apache.org/jira/browse/CALCITE-1898 > {code} > The LIKE operator must match '.' (period) literally, not treat it as a > wild-card. Currently it treats it the same as '_'. > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN
[ https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-32219: - Description: I compiled a plan for an INSERT statement and executed the plan, but the SQL client became unresponsive when executing the EXECUTE PLAN statement. I confirmed that the Flink job is running normally by checking the Flink dashboard. The only issue is that the SQL client becomes stuck and cannot accept new commands. I printed the stack trace of the SQL client process, and here is a part of it for reference. {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) at org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} was: I compiled a plan for an insert statement and then executed the plan. However, the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. I have checked the Flink dashboard and confirmed that the job is running normally. The only issue is that the SQL client is stuck and cannot accept new commands. Here is a part of the stack trace for reference.: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal
[jira] [Updated] (FLINK-32219) SQL client hangs when executing EXECUTE PLAN
[ https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-32219: - Description: I compiled a plan for an insert statement and then executed the plan. However, the SQL client becomes unresponsive when executing the EXECUTE PLAN statement. I have checked the Flink dashboard and confirmed that the job is running normally. The only issue is that the SQL client is stuck and cannot accept new commands. Here is a part of the stack trace for reference.: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) at org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} was: I compiled plan for insert statement firstly and then I execute the plan. However the sql client is pending after running execute plan statement. Here is the part of stacktrace: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaini
[jira] [Updated] (FLINK-32219) sql client would be pending after executing plan of inserting
[ https://issues.apache.org/jira/browse/FLINK-32219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuai Xu updated FLINK-32219: - Affects Version/s: 1.17.1 > sql client would be pending after executing plan of inserting > - > > Key: FLINK-32219 > URL: https://issues.apache.org/jira/browse/FLINK-32219 > Project: Flink > Issue Type: Bug >Affects Versions: 1.17.1 > Reporter: Shuai Xu >Priority: Major > > I compiled plan for insert statement firstly and then I execute the plan. > However the sql client is pending after running execute plan statement. Here > is the part of stacktrace: > {code:java} > "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 > waiting on condition [0x000173e01000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00076e72af20> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) > at > org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) > at > org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) > at java.util.Iterator.forEachRemaining(Iterator.java:115) > at > org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) > at > org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) > at > org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown > Source) > at > org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) > at > org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown > Source) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown > Source) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32219) sql client would be pending after executing plan of inserting
Shuai Xu created FLINK-32219: Summary: sql client would be pending after executing plan of inserting Key: FLINK-32219 URL: https://issues.apache.org/jira/browse/FLINK-32219 Project: Flink Issue Type: Bug Reporter: Shuai Xu I compiled plan for insert statement firstly and then I execute the plan. However the sql client is pending after running execute plan statement. Here is the part of stacktrace: {code:java} "pool-2-thread-1" #30 prio=5 os_prio=31 tid=0x0001172e5000 nid=0x6d03 waiting on condition [0x000173e01000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00076e72af20> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultProvider.hasNext(InsertResultProvider.java:83) at org.apache.flink.table.api.internal.InsertResultProvider.access$200(InsertResultProvider.java:37) at org.apache.flink.table.api.internal.InsertResultProvider$Iterator.hasNext(InsertResultProvider.java:106) at java.util.Iterator.forEachRemaining(Iterator.java:115) at org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:115) at org.apache.flink.table.gateway.service.result.ResultFetcher.fromTableResult(ResultFetcher.java:163) at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:542) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:440) at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl$$Lambda$389/1391083077.apply(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) at org.apache.flink.table.gateway.service.operation.OperationManager$$Lambda$390/208625838.call(Unknown Source) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) at org.apache.flink.table.gateway.service.operation.OperationManager$Operation$$Lambda$392/670621032.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31956) Extend the CompiledPlan to read from/write to Flink's FileSystem
[ https://issues.apache.org/jira/browse/FLINK-31956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717082#comment-17717082 ] Shuai Xu commented on FLINK-31956: -- Hi Jane, I have taken the FLINK-31952 which is similar to the task. I think it could be achieved by modifying the streamplanner and tableEnvironmentImpl.Could I take the task? > Extend the CompiledPlan to read from/write to Flink's FileSystem > > > Key: FLINK-31956 > URL: https://issues.apache.org/jira/browse/FLINK-31956 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Client, Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Priority: Major > Fix For: 1.18.0 > > > At present, COMPILE/EXECUTE PLAN FOR '${plan.json}' only supports writing > to/reading from a local file without the scheme. We propose to extend the > support for Flink's FileSystem. > {code:sql} > -- before > COMPILE PLAN FOR '/tmp/foo/bar.json' > EXECUTE PLAN FOR '/tmp/foo/bar.json' > -- after > COMPILE PLAN FOR 'file:///tmp/foo/bar.json' > COMPILE PLAN FOR 'hdfs:///tmp/foo/bar.json' > COMPILE PLAN FOR 's3:///tmp/foo/bar.json' > COMPILE PLAN FOR 'oss:///tmp/foo/bar.json' > EXECUTE PLAN FOR 'file:///tmp/foo/bar.json' > EXECUTE PLAN FOR 'hdfs:///tmp/foo/bar.json' > EXECUTE PLAN FOR 's3:///tmp/foo/bar.json' > EXECUTE PLAN FOR 'oss:///tmp/foo/bar.json' {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan
[ https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717031#comment-17717031 ] Shuai Xu edited comment on FLINK-31952 at 4/27/23 7:31 AM: --- Hi Jane, I have some ideas for the task and this is how I consider it. Purpose of the task: Support explain [plan for | ] '/path-to-json.json' The steps I think could resolve the task : 1. Modify parserImpl.ftl template to add syntax support, which can be verified by FlinkSqlParserImplTest 2. Translate the sql to Operation. 3. TableEnv#explainPlan is a ready API, we just got the internal plan from the Operation and pass it to TableEnv#explainPlan. 4. Finally add the test in and SqlOperationConverterTest and TableEnvironmentTest for verification. Could I take the task? was (Author: JIRAUSER300096): Hi Jane, I have some ideas for the task and this is how I consider it. Purpose of the task: Support explain [plan for | ] '/path-to-json.json' The steps I think could resolve the task : 1. Modify parserImpl.ftl template to add syntax support, which can be verified by FlinkSqlParserImplTest 2. Translate the sql to Operation. 3. TableEnv#explainPlan is a ready API, we just got the internal plan from the Operation and pass it to TableEnv#explainPlan. 4. Finally add the test in and SqlOperationConverterTest and TableEnvironmentTest for verification. Could I take the task? > Support 'EXPLAIN' statement for CompiledPlan > > > Key: FLINK-31952 > URL: https://issues.apache.org/jira/browse/FLINK-31952 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Priority: Major > Fix For: 1.18.0 > > > Support the explain SQL syntax towards serialized CompiledPlan > {code:sql} > EXPLAIN [ | PLAN FOR] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31952) Support 'EXPLAIN' statement for CompiledPlan
[ https://issues.apache.org/jira/browse/FLINK-31952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717031#comment-17717031 ] Shuai Xu commented on FLINK-31952: -- Hi Jane, I have some ideas for the task and this is how I consider it. Purpose of the task: Support explain [plan for | ] '/path-to-json.json' The steps I think could resolve the task : 1. Modify parserImpl.ftl template to add syntax support, which can be verified by FlinkSqlParserImplTest 2. Translate the sql to Operation. 3. TableEnv#explainPlan is a ready API, we just got the internal plan from the Operation and pass it to TableEnv#explainPlan. 4. Finally add the test in and SqlOperationConverterTest and TableEnvironmentTest for verification. Could I take the task? > Support 'EXPLAIN' statement for CompiledPlan > > > Key: FLINK-31952 > URL: https://issues.apache.org/jira/browse/FLINK-31952 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jane Chan >Priority: Major > Fix For: 1.18.0 > > > Support the explain SQL syntax towards serialized CompiledPlan > {code:sql} > EXPLAIN [ | PLAN FOR] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] flink pull request #6360: [FLINK-9884] [runtime] fix slot request may not be...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/6360 [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager ## What is the purpose of the change *(The pull request fix the bug that slot request may not be removed from pendingSlotRequests in slot manager when it has been assigned.)* ## Verifying this change This change added tests and can be verified as follows: - *Added test in SlotManagerTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-9884 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6360 commit 58e24424401d28647e376a9ee32d7b70d9ca2724 Author: shuai-xu Date: 2018-07-18T07:54:55Z [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager commit 4d53107a2817e0e3def8ed31926a7b4a97251c1c Author: shuai-xu Date: 2018-07-18T07:56:50Z adjust the import order ---
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5931 @StephanEwen, Good idea, I prefer the first one. As for the second one, the pending request may have been fulfilled when task executor is killed. so job master can not cancel the pending request. And when job master failover the job at the same time with resource manager request a new container, it may ask one more container than needed. ---
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5931 @GJL In blink, we solve this problem like this. When a container complete, we will first whether the container has registered yet, if it has registered before, RM will not request a new container, as the job master will ask for it when failover. If it has not registered, RM will request a new one. ---
[GitHub] flink pull request #5951: [FLINK-9293] [runtime] SlotPool should check slot ...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5951 [FLINK-9293] [runtime] SlotPool should check slot id when accepting a slot offer with existing allocation id ## What is the purpose of the change *This pull request fix that job master will accept multi slot offers with same allocation id and make the later slots leak.* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Run the SlotPoolTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-9293 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5951.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5951 commit 22ae227e2a19ddf15890dcce779536687328d7ac Author: shuai.xus Date: 2018-05-03T09:13:08Z [FLINK-9293] [runtime] SlotPool should check slot id when accepting a slot offer with existing allocation id ---
[GitHub] flink pull request #5693: [FLINK-8938] [runtime] not remove job graph during...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5693 [FLINK-8938] [runtime] not remove job graph during job master failover ## What is the purpose of the change *This pull request fix a bug that when job master failover, it may delete the job graph, so the next job master can not recover the job any more.* ## Verifying this change *(Please pick either of the following options)* This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8938 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5693.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5693 commit 2fc5ec8795b5a53ce0f653a0e0ad5c1346e826c8 Author: shuai.xus Date: 2018-03-14T04:16:55Z [FLINK-8938] [runtime] not remove job graph during job master failover ---
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/5190 ---
[GitHub] flink issue #5190: [FLINK-8289] [runtime] set the rest.address to the host o...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5190 @tillrohrmann , In fact, the original problem I'm concerned with has been fix by your commit "[FLINK-8119] [flip6] Wire correct Flip6 components in Flip6YarnClusterDescriptor", which set the RestOptions#REST_ADDRESS to the host of the container. ---
[GitHub] flink pull request #5297: [FLINK-8434] Take over the running task manager af...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5297 [FLINK-8434] Take over the running task manager after yarn app master failvoer ## What is the purpose of the change *This pull request makes the yarn resource manager could take over the running container from previous attempt.* ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8434 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5297.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5297 commit 2b05e621f57f1d6885d37c3aa7972e6755bc1a20 Author: shuai.xus Date: 2018-01-15T08:54:40Z [FLINK-8434] Take over the running task manager after yarn app master failover ---
[GitHub] flink pull request #2675: [FLINK-4504][dataset api]Support user to decide wh...
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/2675 ---
[GitHub] flink pull request #2674: [FLINK-4444][runtime]Add a InputChannel and Result...
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/2674 ---
[GitHub] flink issue #5271: [FLINK-8399] [runtime] use independent configurations for...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5271 @tillrohrmann thank you for reviewing, I have modified it. ---
[GitHub] flink issue #5190: [FLINK-8289] [runtime] set the rest.address to the host o...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5190 @tillrohrmann @EronWright , this make it more clear, but seems not solve the problem completely. Since we need to set RestOptions#ADDRESS to the address of a rest server so the client can access the rest server. But we get 0.0.0.0 from getRestAddress of the rest server if let the rest server bind to RestOptions#BIND_ADDRESS with 0.0.0.0 unless we add another method to the rest server which can get the advertised address. ---
[GitHub] flink issue #5170: [FLINK-8266] [runtime] add network memory to ResourceProf...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5170 @tillrohrmann sorry, the conflict is resolved now ---
[GitHub] flink issue #5186: [FLINK-8288] [runtime] register job master rest endpoint ...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5186 @NicoK , thank you for reviewing, I have adjusted the indent. ---
[GitHub] flink pull request #5271: [FLINK-8399] [runtime] use independent configurati...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5271 [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager ## What is the purpose of the change *This pull request separate the timeouts for slot request to task manager, slot request to be discarded and task manager to be released in slot manager to three different configurations.* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8399 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5271 commit f7024439ead5e3848c705659bfe221b8ce50f154 Author: shuai.xus Date: 2018-01-10T07:43:20Z [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager ---
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/5190#discussion_r158627004 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) throws Exception { // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); + configuration.setString(RestOptions.REST_ADDRESS, commonRpcService.getAddress()); --- End diff -- I think what we want to get from the RestServerEndpoint is its server address. One way is to let its bind address to be the real ip of the machine. The common rpc address now is the real ip. ---
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5190 [FLINK-8289] [runtime] set the rest.address to the host of the rest server machine ## What is the purpose of the change This pull request set the rest.address to the host of the Dispatcher or JobMaster, so that the real address of the rest server instead of 0.0.0.0:9067 or 127.0.0.0:9067. ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8289 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5190 commit 0956b94a910df071a61aae90cac7fef2b795ed0c Author: shuai.xus Date: 2017-12-20T09:37:10Z [FLINK-8289] [runtime] set the rest.address to the host of the rest server machine ---
[GitHub] flink pull request #5186: [FLINK-8288] [runtime] register job master rest en...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5186 [FLINK-8288] [runtime] register job master rest endpoint url to yarn ## What is the purpose of the change This pull request pass the endpoint url of job master rest server to resource manager so it can register the url to YARN or Mesos. ## Verifying this change *(Please pick either of the following options)* This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5186.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5186 commit 9a65d995454fe8d49ba6b84094ea7e5ee687836e Author: shuai.xus Date: 2017-12-20T02:10:35Z [FLINK-8288] [runtime] register job master rest endpoint url to yarn ---
[GitHub] flink pull request #5170: [FLINK-8266] [runtime] add network memory to Resou...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5170 [FLINK-8266] [runtime] add network memory to ResourceProfile for the input and output memory of a task ## What is the purpose of the change This pull request adds a network memory field to ResourceProfile. So job master can set the network memory of a task according to the number of input channels and output sub partitions. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change This change can be verified by running ResourceProfileTest: ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicabled) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8266 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5170 commit 93822bfac6a0794b2b2047e046dcef93c5313185 Author: shuai.xus Date: 2017-12-15T10:43:27Z [FLINK-8266] add network memroy to ResourceProfile for the input and output memory of a task ---
[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4991#discussion_r156848154 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -52,6 +63,15 @@ /** How many native memory in mb are needed */ private final int nativeMemoryInMB; + /** A extensible field for user specified resources from {@link ResourceSpec}. */ + private final Map extendedResources = new TreeMap<>(); --- End diff -- HashMap is not ordered, so when comparing two ResourceProfile, the result may differ if the extended resource are added in different order. Using TreeMap can promise the comparision of two ResourceProfile is always same if the extend resources is same. ---
[GitHub] flink pull request #5139: [FLINK-8224] [runtime] shutdown application when j...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5139 [FLINK-8224] [runtime] shutdown application when job terminated in job mode ## What is the purpose of the change This current job cluster entrypoint doesn't call resource manage to shutdown the application. So resource manger has no change to set the application status to the outer resource management system such as YARN/Mesos. This may make the YARN still consider the application as running even the job is finished. ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8224 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5139 commit b047b2a50791f4eeeb4c3a984d060ffdbf57ea26 Author: shuai.xus Date: 2017-12-08T10:02:42Z [FLINK-8224] [runtime] shutdown application when job terminated in job mode ---
[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4991#discussion_r155708446 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java --- @@ -61,16 +80,26 @@ * @param heapMemoryInMB The size of the heap memory, in megabytes. * @param directMemoryInMB The size of the direct memory, in megabytes. * @param nativeMemoryInMB The size of the native memory, in megabytes. +* @param memoryForInputInMB The size of the memory for input, in megabytes. +* @param memoryForOutputInMB The size of the memory for output, in megabytes. */ public ResourceProfile( double cpuCores, int heapMemoryInMB, int directMemoryInMB, - int nativeMemoryInMB) { + int nativeMemoryInMB, + int memoryForInputInMB, + int memoryForOutputInMB, --- End diff -- I think resource spec contains the resource user need to run their code, while resource profile contains the resource for running a task. So resource profile should also contain the part of resource used by flink system. We divide these part of resource to memoryForInputInMB and memoryForOutputInMB, and separate them from heap memory and direct memory so as to different resource managers can choose different strategies. For example, per job resource manager need all these resource when allocating a task manager. but session manager may not consider the memoryForInputInMB and memoryForOutputInMB when assign a slot, as these part is decide when the session cluster is created. Do you think it make sense? ---
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann , I agree with you that adding a build looks better, I changed it according to your comments. Do you think it works now? ---
[GitHub] flink issue #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClus...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4988 This pr is very helpful for us. Thank you. I have tried it on our cluster with yarn job modeï¼and it works well. My only suggestion is to change the default value of rest.address to 0.0.0.0 instead of localhost. Since if the rest server start at localhost, it can only be accessed from local machine. But if on 0.0.0.0, it can be accessed remotely. ---
[GitHub] flink pull request #4985: [FLINK-8027] Generalize existing rest handlers to ...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4985#discussion_r153708910 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.CompletableFuture; + +/** + * Simple {@link TaskExecutorGateway} implementation for testing purposes. + */ +public class TestingTaskExecutorGateway implements TaskExecutorGateway { + + private final String address; + + private final String hostname; + + public TestingTaskExecutorGateway() { + this("foobar:1234", "foobar"); + } + + public TestingTaskExecutorGateway(String address, String hostname) { + this.address = Preconditions.checkNotNull(address); + this.hostname = Preconditions.checkNotNull(hostname); + } + + @Override + public CompletableFuture requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture updatePartitions(ExecutionAttemptID executionAttemptID, Iterable partitionInfos, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void failPartition(ExecutionAttemptID executionAttemptID) { + // noop + } + + @Override + public CompletableFuture triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture stopTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public void heartbeatFromJobManager(ResourceID heartbeatOrigin) { + // noop + } + + @Override + public void heartbeatFromResourceManager(ResourceID heartbeatOrigin) { + // noop + } + + @Override + public void disconnectJobManager(JobID jobId, Exception cause) {
[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4991 [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Notes: this pull request contains the #4911 since it depends on it. ## What is the purpose of the change This pull request makes task extendable with ResourceSpec( #4911), and add a two field for calculating the memory needed for an operator to communicating with its upstream and downstream. ## Brief change log - *Add a extendedResource field for extendable resources in ResourceSpec* - *Add memoryForInputInMB nad memoryForOutputInMB for the memory needed for an operator to communicating with its upstream and downstream* - *Add a fromResourceSpec method for transforming ResourceSpec to ResourceProfile* ## Verifying this change This change added tests and can be verified as follows: - *Added test in ResourceProfileTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-7928 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4991.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4991 commit 3e1d61a33f18b351424d4684cbaebc22674f582c Author: shuai.xus Date: 2017-10-25T06:56:35Z [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resouce type extendible in the ResourceSpec. Add a extend field for new resources. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D327427 commit d769fe5d0184cd6ac264fd42552d290ae6978fbb Author: shuai.xus Date: 2017-11-08T09:10:01Z make Resource abstract and add GPUResource FPGAResource commit f897d1fa1742c8186c93bb60abfd8719f156c7da Author: shuai.xus Date: 2017-11-08T09:20:22Z enhance test commit b8e882b9f39f5588338297ce227e200c6527b84b Author: shuai.xus Date: 2017-11-10T02:00:08Z make create protected commit 41cf6e4c7e68ef84d9d84e909b417fc6ddc794a6 Author: shuai.xus Date: 2017-11-10T03:02:21Z make constructor public commit 931e279e5a85f38e6cd9e53169fd37b8ce2d87ad Author: shuai.xus Date: 2017-10-26T09:38:04Z [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager Summary: ResourceProfile denotes the resource requirements of a task. It should contains: 1. The resource for the operators: the resources in ResourceSpec (please refer to jira-7878) 2. The resource for the task to communicate with its upstreams. 3. The resource for the task to communicate with its downstreams. Now the ResourceProfile only contains the first part. Adding the last two parts. Test Plan: UnitTests Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D330364 commit 6665d570882efa49e35251092385efc8fb6adeb8 Author: shuai.xus Date: 2017-10-27T07:43:25Z modify compare commit 739564db031febd5bb029f08df3ced1ef539c7e6 Author: shuai.xus Date: 2017-10-30T04:01:42Z add more denotes commit c39c3597c1094bb258556d8d6dc12e5305903ea8 Author: shuai.xus Date: 2017-11-10T02:55:26Z rebase with 7878 ---
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann, I make the Resource abstract and add GPUResource and FPGAResource, so user can only add such defined resources, how about it? ---
[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4911#discussion_r149610252 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java --- @@ -61,18 +79,17 @@ /** How many state size in mb are used */ private final int stateSizeInMB; + private final Map extendedResources = new HashMap<>(1); --- End diff -- done, and add a test for it. ---
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann, There is not a generic way for both Yarn and Mesos, as their resource allocation interface are different. I think the YARN/MESOS resource manager should handle it in their own way. For example, in YarnResourceManager, it can add all extended resources to the yarn Resource.class by call setResourceValue(name, value). And then only if YARN support a new resource type, user can define it without code changing in flink. ---
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 @tillrohrmann, Yes, the Resource is a little too generic and prone to typos. However, the resource are various and closely related to the platform(YARN/MESOS), only a GPUResource and FPGAResource may not satisfy user's need. For example, we have at lease two types of FPGA resources in our cluster. And it could consider the users who need to specify extended resources as advanced users. General users only need to know vcore and memory, which are already defined in ResurceSpec. Advanced users should be familiar with not only flink but also the resource platform. They should know the resources types YARN/MESOS supports. And, If flink resource manager passes all the extended resource to YARN/MESOS when starting a container, it need not change when adding a new resource type only if YARN/MESOS can recognize it from extended resources. There has to be a compromise between extendibility and ease of use. I suggest we can add a general GPUResource and FPGAResource for general use while st ill keeping the Resource for extension. Does this make sense? ---
[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4937#discussion_r148718854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -262,23 +263,36 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - ScheduledUnit task, --- End diff -- Why put the ScheduledUnit as a parameter here? I think the interface in slot pool should be clean and it should only have resource related parameters, should not have schedule related parameters. ---
[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148716173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -874,6 +894,13 @@ public void handleError(final Exception exception) { */ public abstract boolean stopWorker(ResourceID resourceID); + /** +* Cancel the allocation of a resource. If the resource allocation has not fulfilled, should cancel it. +* +* @param resourceProfile The resource description of the previous allocation +*/ + public abstract void cancelNewWorker(ResourceProfile resourceProfile); --- End diff -- I comment it is slot manager. ---
[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148715689 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -52,4 +52,6 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + void cancelResourceAllocation(ResourceProfile resourceProfile); --- End diff -- I comment it is slot manager. ---
[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148715651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) { PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); if (null != pendingSlotRequest) { - cancelPendingSlotRequest(pendingSlotRequest); + if (pendingSlotRequest.isAssigned()) { + cancelPendingSlotRequest(pendingSlotRequest); + } + else { + resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile()); --- End diff -- Yes, the SlotManager can decide to release the resource more than needed. But in a worst case: 1. Now the MESOS or YARN cluster have not enough resource. 2. A job ask for 100 worker of size A; 3. As there are not enough resource, the job failover, the previous 100 is not cancelled, it ask another 100. 4. This repeated several times, the pending requests for worker of size A reaches 1. 5. A worker of size B crashed, so the job now only need 100 woker of size A and 1 worker of size B. But the YARN or MESOS think the job need 1 A and 1 B as the request are never cancelled. 6. The MESOS/YARN now have some resources for 110 A, more than 100 A and 1 B, and it begin to assign resource for the job, but it first try to allocate 1 containers of size A, and the job still can not be started as it is lack of container B. 7. This may cause the job can not be started even when the cluster resource is now enough in a long time. 8. And this did happen in our cluster, as our cluster is busy. So I think it's better to keep this protocol, and different resource managers can treat this protocol according to their need. ---
[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4937 [FLINK-6434] [runtime] cancel slot allocation if request timed out in ProviderAndOwner ## What is the purpose of the change This pr adds a cancel slot allocation protocol between ProviderAndOwner and SlotPool. So that ProviderAndOwner can cancel the slot allocations no longer need to avoid slot leaking. ## Brief change log - *Let the ProviderAndOwner generate the allocation id before calling allocateSlot to slot pool.* - *If the allocateSlot call timed out, ProviderAndOwner cancel the previos allocation to slot pool.* ## Verifying this change This change added tests and can be verified as follows: - *Added unittest in SlotPoolRpcTest* - *Modify the existing SlotPoolTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-6434 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4937 commit ab3c599d55847451a1194ba55375207267561a71 Author: shuai.xus Date: 2017-10-20T09:12:39Z [FLINK-6434] cancel slot allocation if request timed out in ProviderAndOwner Summary: This fix flink jira #6434 1. Let the ProviderAndOwner generate the allcation id before calling allocateSlot to slot pool. 2. If the allocateSlot call timed out, ProviderAndOwner cancel the previos allocation to slot pool. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D323990 ---
[GitHub] flink pull request #3398: [FLINK-5856] [FLIP-6] return redundant containers ...
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/3398 ---
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 I run the failed test on my machine and it pass, and it seems my changes will not influence it. ---
[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4911 [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resource type extendible in the ResourceSpec. Add a extend field for new resources. ## What is the purpose of the change This pull request adds a extensible filed to the ResourceSpec, so user can define variable resources only if supported by their resource manager. *(for example:)* user can use _text.flatMap().setResource(new ResourceSpce(1, 100, new ResourceSpce.Resource("GPU", 0.5)));_ to define their GPU requirement for the operator. ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests ResourceSpecTest to verify. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-7878 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4911.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4911 commit 3e1d61a33f18b351424d4684cbaebc22674f582c Author: shuai.xus Date: 2017-10-25T06:56:35Z [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resouce type extendible in the ResourceSpec. Add a extend field for new resources. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D327427 ---
[GitHub] flink issue #4887: [FLINK-7870] [runtime] Cancel slot allocation to RM when ...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4887 @tillrohrmann could you help to review this pr? Thank you. ---
[GitHub] flink pull request #4887: [FLINK-7870] [runtime] Cancel slot allocation to R...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4887 [FLINK-7870] [runtime] Cancel slot allocation to RM when requestSlot timed out in SlotPool ## What is the purpose of the change This pull request add a cancelSlotRequest rpc protocol between slot pool and resource manager. When the pending request timeout in slot pool, it send a cancelSlotRequest rpc to resouce manager to canel the previous slot request in order not to make the slot request become more and more in resource manager. ## Verifying this change This change added tests and can be verified as follows: - *Added a verify in SlotManagerTest to make sure the cancel logic ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira- Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4887.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4887 commit daf7fd5745659d94f5a84f669bc90b82b5e69e5e Author: shuai.xus Date: 2017-10-17T09:57:18Z [FLINK-] slot pool cancel slot request to resource manager if timeout Summary: slot pool cancel slot request to resource manager if timeout Test Plan: unit test Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D320749 commit 96f80187bb5ef1c268a62bdaf80151a70a04b002 Author: shuai.xus Date: 2017-10-19T04:13:01Z add more contract ---
[GitHub] flink issue #3580: [FLINK-5791] Support an optimal matching based slot manag...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/3580 @tillrohrmann Could you please help to review this pr? And there is a question, the pending requests in slot managers are not in order. So requests arrived later may be fulfilled earlier, does this matter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3398: [FLINK-5856] [FLIP-6] return redundant containers to yarn...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/3398 @tillrohrmann , sorry to reply late. I will modify it according to your comments in one week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3539: Flip1: fine gained recovery
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/3539 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3755: [FLINK-6351] [YARN] Refactoring YarnFlinkApplicationMaste...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/3755 There will soon be a YarnFlinkAppMaster extends AbstractYarnFlinkApplicationMasterRunner for flink yarn cluster mode, so need not combine them to one class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3773: [FLINK-5867] [FLINK-5866] [flip-1] Implement Failo...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/3773#discussion_r113361061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * FailoverRegion manages the failover of a minimal pipeline connected sub graph. + * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING, + */ +public class FailoverRegion { + + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state"); + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class); + + // + + /** a unique id for debugging */ + private final AbstractID id = new AbstractID(); + + private final ExecutionGraph executionGraph; + + private final List connectedExecutionVertexes; + + /** The executor that executes the recovery action after all vertices are in a */ + private final Executor executor; + + /** Current status of the job execution */ + private volatile JobStatus state = JobStatus.RUNNING; + + + public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List connectedExecutions) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + + LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); + } + + public void onExecutionFail(Execution taskExecution, Throwable cause) { + // TODO: check if need to failover the preceding region + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + executionGraph.failGlobal(cause); + } + else { + cancel(taskExecution.getGlobalModVersion()); + } + } + + private void allVerticesInTerminalState(long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.CANCELLING)) { + if (transitionState(curStatus, JobStatus.CANCELED)) { + reset(globalModVersionOfFailover); +