[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574285#comment-17574285 ] Hongbo commented on FLINK-24926: Confirmed it's fixed in 1.15.1. Thanks! > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > Fix For: 1.15.0 > > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescr
[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17451247#comment-17451247 ] Hongbo commented on FLINK-24926: Thanks all for figuring it out! I will retry when 1.15 is out. > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lam
[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450874#comment-17450874 ] Yun Tang commented on FLINK-24926: -- Thanks for the explanation [~lincoln.86xy]. You might need to pick the fix or wait for the flink-1.15 release, [~liuhb86]. > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at >
[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450839#comment-17450839 ] lincoln lee commented on FLINK-24926: - [~yunta] Flink SQL support self join. The EmptyRowDataKeySelector is because the join key is empty exclude time attribute. This issue is fixed by https://issues.apache.org/jira/browse/FLINK-19792, but not backport to 1.14 because the compatibility. > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(E
[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17449172#comment-17449172 ] Yun Tang commented on FLINK-24926: -- I am not sure whether Flink SQL supports to join on the stream itself. The root cause is that the key selector on the join operator is actually EmptyRowDataKeySelector (see [here|https://github.com/apache/flink/blob/2c5ccd1e909d38ef18486ba55da6db454e33ca94/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java#L67] ), which leads to no matter what kind of key is passed, the downstream operator cannot set the correct key. > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(Inv
[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448649#comment-17448649 ] Hongbo commented on FLINK-24926: [~yunta] Yes, we can reproduce every time. And we've seen this error in a Mac machine, a Windows machine, and a Linux cluster. I created a simple repo in Github: [https://github.com/liuhb86/flink24926] > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.
[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API
[ https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447187#comment-17447187 ] Yun Tang commented on FLINK-24926: -- [~liuhb86] Can you repeat this error every time? I use test code below to verify but not ever come across your error message for minutes. {code:java} @Test def testJoinStream(): Unit = { val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); val tableEnv = TableEnvironment.create(settings) val configuration = tableEnv.getConfig.getConfiguration configuration.setString("table.exec.resource.default-parallelism", "2"); val testTable = tableEnv.from(TableDescriptor.forConnector("datagen") .schema(Schema.newBuilder() .column("ts", DataTypes.TIMESTAMP(3)) .column("v", DataTypes.INT()) .watermark("ts", "ts - INTERVAL '1' second") .build()) .option("rows-per-second", "2") .option("fields.v.kind", "sequence") .option("fields.v.start", "0") .option("fields.v.end", "100") .build()) testTable.printSchema(); tableEnv.createTemporaryView("test", testTable ); val joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); val tableResult = joined.executeInsert(TableDescriptor.forConnector("print").build()); tableResult.await() } {code} > Key group is not in KeyGroupRange when joining two streams with table API > - > > Key: FLINK-24926 > URL: https://issues.apache.org/jira/browse/FLINK-24926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Hongbo >Priority: Major > > I have a simple test to join two streams by the event time: > > {code:java} > @Test > void testJoinStream() { > var settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > var tableEnv = TableEnvironment.create(settings); > var configuration = tableEnv.getConfig().getConfiguration(); > configuration.setString("table.exec.resource.default-parallelism", "2"); > var testTable = tableEnv.from(TableDescriptor.forConnector("datagen") > .schema(Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("v", DataTypes.INT()) > .watermark("ts", "ts - INTERVAL '1' second") > .build()) > .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L) > .option("fields.v.kind", "sequence") > .option("fields.v.start", "0") > .option("fields.v.end", "100") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" + > " join (SELECT ts as ts2, v as v2 from test) on ts = ts2"); > try { > var tableResult = > joined.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } catch (InterruptedException | ExecutionException e) { > throw new RuntimeException(e); > } > } {code} > It failed within a few seconds: > {code:java} > ( > `ts` TIMESTAMP(3) *ROWTIME*, > `v` INT, > WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second > ) > 1> +I[2021-11-16T17:48:24.415, 1, 1] > 1> +I[2021-11-16T17:48:24.415, 0, 1] > 1> +I[2021-11-16T17:48:24.415, 1, 0] > 1> +I[2021-11-16T17:48:24.415, 0, 0] > java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.flink.table.api.TableException: Failed to wait job finish > at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.