[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2022-08-02 Thread Hongbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574285#comment-17574285
 ] 

Hongbo commented on FLINK-24926:


Confirmed it's fixed in 1.15.1. Thanks!

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
> Fix For: 1.15.0
>
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescr

[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-30 Thread Hongbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17451247#comment-17451247
 ] 

Hongbo commented on FLINK-24926:


Thanks all for figuring it out! I will retry when 1.15 is out.

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lam

[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-29 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450874#comment-17450874
 ] 

Yun Tang commented on FLINK-24926:
--

Thanks for the explanation [~lincoln.86xy].

You might need to pick the fix or wait for the flink-1.15 release, [~liuhb86].

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
>

[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-29 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450839#comment-17450839
 ] 

lincoln lee commented on FLINK-24926:
-

[~yunta]  Flink SQL support self join.  The EmptyRowDataKeySelector is because 
the join key is empty exclude time attribute.

This issue is fixed by https://issues.apache.org/jira/browse/FLINK-19792, but 
not backport to 1.14 because the compatibility. 

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(E

[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-25 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17449172#comment-17449172
 ] 

Yun Tang commented on FLINK-24926:
--

I am not sure whether Flink SQL supports to join on the stream itself.

The root cause is that the key selector on the join operator is actually 
EmptyRowDataKeySelector (see 
[here|https://github.com/apache/flink/blob/2c5ccd1e909d38ef18486ba55da6db454e33ca94/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java#L67]
 ), which leads to no matter what kind of key is passed, the downstream 
operator cannot set the correct key.

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(Inv

[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-24 Thread Hongbo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448649#comment-17448649
 ] 

Hongbo commented on FLINK-24926:


[~yunta]  Yes, we can reproduce every time. And we've seen this error in a Mac 
machine, a Windows machine, and a Linux cluster.

I created a simple repo in Github: [https://github.com/liuhb86/flink24926]

 

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.

[jira] [Commented] (FLINK-24926) Key group is not in KeyGroupRange when joining two streams with table API

2021-11-21 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447187#comment-17447187
 ] 

Yun Tang commented on FLINK-24926:
--

[~liuhb86] Can you repeat this error every time?
I use test code below to verify but not ever come across your error message for 
minutes.

{code:java}
  @Test
  def testJoinStream(): Unit = {
val settings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()
  .build();
val tableEnv = TableEnvironment.create(settings)
val configuration = tableEnv.getConfig.getConfiguration
configuration.setString("table.exec.resource.default-parallelism", "2");

val testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
  .schema(Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("v", DataTypes.INT())
.watermark("ts", "ts - INTERVAL '1' second")
.build())
  .option("rows-per-second", "2")
  .option("fields.v.kind", "sequence")
  .option("fields.v.start", "0")
  .option("fields.v.end", "100")
  .build())
testTable.printSchema();
tableEnv.createTemporaryView("test", testTable );

val joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
  " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");

  val tableResult = 
joined.executeInsert(TableDescriptor.forConnector("print").build());
  tableResult.await()
  }
{code}
 

> Key group is not in KeyGroupRange when joining two streams with table API
> -
>
> Key: FLINK-24926
> URL: https://issues.apache.org/jira/browse/FLINK-24926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a simple test to join two streams by the event time:
>  
> {code:java}
> @Test
> void testJoinStream() {
> var settings = EnvironmentSettings
> .newInstance()
> .inStreamingMode()
> .build();
> var tableEnv = TableEnvironment.create(settings);
> var configuration = tableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.resource.default-parallelism", "2");
> var testTable = tableEnv.from(TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("v", DataTypes.INT())
> .watermark("ts", "ts - INTERVAL '1' second")
> .build())
> .option(DataGenConnectorOptions.ROWS_PER_SECOND, 2L)
> .option("fields.v.kind", "sequence")
> .option("fields.v.start", "0")
> .option("fields.v.end", "100")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var joined = tableEnv.sqlQuery("SELECT ts, v, v2 from test" +
> " join (SELECT ts as ts2, v as v2 from test) on ts = ts2");
> try {
> var tableResult = 
> joined.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> } catch (InterruptedException | ExecutionException e) {
> throw new RuntimeException(e);
> }
> } {code}
> It failed within a few seconds:
> {code:java}
> (
>   `ts` TIMESTAMP(3) *ROWTIME*,
>   `v` INT,
>   WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '1' second
> )
> 1> +I[2021-11-16T17:48:24.415, 1, 1]
> 1> +I[2021-11-16T17:48:24.415, 0, 1]
> 1> +I[2021-11-16T17:48:24.415, 1, 0]
> 1> +I[2021-11-16T17:48:24.415, 0, 0]
> java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.flink.table.api.TableException: Failed to wait job finish
>     at com.microstrategy.realtime.FlinkTest.testJoinStream(FlinkTest.java:123)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.