[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253757#comment-16253757 ] ASF GitHub Bot commented on FLINK-8063: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183131 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); --- End diff -- can you not check the class of the exception? > Client blocks indefinitely when querying a non-existing state > - > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183957 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -439,6 +443,85 @@ public Integer getKey(Tuple2value) throws Exception { } /** +* Tests that the correct exception is thrown if the query +* contains a wrong queryable state name. +*/ + @Test + public void testWrongQueryableStateName() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream > source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor > valueState = + new ValueStateDescriptor<>("any", source.getType()); + + source.keyBy(new KeySelector , Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // wait until the job is running before starting to query. + FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class))); + + CompletableFuture >> future = client.getKvState( + jobId, + "wrong-hankuna", // this is the wrong name. + 0, + VoidNamespace.INSTANCE, + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); + + final CompletableFuture completion = new CompletableFuture<>(); + future.whenComplete((result, throwable) -> { + Assert.assertTrue(throwable != null); + Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation")); --- End diff -- No because it is already "string-ified". ---
[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253765#comment-16253765 ] ASF GitHub Bot commented on FLINK-8063: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151183695 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- No, an `UnknownKvStateIdException` is thrown in this case. See `KvStateServerHandler`. > Client blocks indefinitely when querying a non-existing state > - > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253770#comment-16253770 ] ASF GitHub Bot commented on FLINK-8063: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184384 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -133,12 +131,11 @@ private void executeActionAsync( operationFuture.whenCompleteAsync( (t, throwable) -> { if (throwable != null) { - if (throwable instanceof CancellationException) { - result.completeExceptionally(throwable); - } else if (throwable.getCause() instanceof UnknownKvStateIdException || + if ( + throwable.getCause() instanceof UnknownKvStateIdException || --- End diff -- The `UnknownKvStateLocationException` is thrown in the case we do not have a server for a particular `keyGroup`. > Client blocks indefinitely when querying a non-existing state > - > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253781#comment-16253781 ] ASF GitHub Bot commented on FLINK-8063: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5021#discussion_r151184658 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java --- @@ -1372,84 +1492,60 @@ public String fold(String accumulator, Tuple2value) throws Excep / General Utility Methods // - private static CompletableFuture getKvStateWithRetries( + private static CompletableFuture getKvState( final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation keyTypeInfo, final StateDescriptor stateDescriptor, - final Time retryDelay, final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static CompletableFuture retryWithDelay( - final Supplieroperation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { - - final CompletableFuture resultFuture = new CompletableFuture<>(); - - retryWithDelay( - resultFuture, - operation, - retries, - retryDelay, - scheduledExecutor, - failIfUnknownKeyOrNamespace); + final ScheduledExecutor executor) throws InterruptedException { + final CompletableFuture resultFuture = new CompletableFuture<>(); + getKvStateIgnoringCertainExceptions( + resultFuture, client, jobId, queryName, key, keyTypeInfo, + stateDescriptor, failForUnknownKeyOrNamespace, executor); return resultFuture; } - public static void retryWithDelay( - final CompletableFuture resultFuture, - final Supplier operation, - final int retries, - final Time retryDelay, - final ScheduledExecutor scheduledExecutor, - final boolean failIfUnknownKeyOrNamespace) { + private static void getKvStateIgnoringCertainExceptions( + final CompletableFuture resultFuture, + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation keyTypeInfo, + final StateDescriptor stateDescriptor, + final boolean failForUnknownKeyOrNamespace, + final ScheduledExecutor executor) throws InterruptedException { if (!resultFuture.isDone()) { - final CompletableFuture operationResultFuture = operation.get(); - operationResultFuture.whenCompleteAsync( - (t, throwable) -> { - if (throwable != null) { - if (throwable.getCause() instanceof CancellationException) { - resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation future was cancelled.", throwable.getCause())); - } else if (throwable.getCause() instanceof AssertionError || - (failIfUnknownKeyOrNamespace && throwable.getCause() instanceof
[jira] [Commented] (FLINK-8041) Recommended Improvements for Gelly's Connected Components Algorithm
[ https://issues.apache.org/jira/browse/FLINK-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253784#comment-16253784 ] Greg Hogan commented on FLINK-8041: --- Currently the vertex value can be any implementation of {{Comparable}}, not just {{Long}}. I can see the benefit to assigning vertices a label as with {{DataSetUtils#zipWithUniqueId}}. There looks to be a cost or limit on scalability with the second proposal, but it would be great to start a discussion on a PR. > Recommended Improvements for Gelly's Connected Components Algorithm > --- > > Key: FLINK-8041 > URL: https://issues.apache.org/jira/browse/FLINK-8041 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.3.2 > Environment: Linux, IntelliJ IDEA >Reporter: Christos Hadjinikolis >Priority: Minor > Fix For: 1.4.0 > > Original Estimate: 336h > Remaining Estimate: 336h > > At the moment, the ConnectedComponents algorithm that comes with Flink's > native Graph API (Gelly) has two issues: > 1. It relies on the user to provide correct values for in the vertices > DataSet. Based on how the algorithm works, these values must be of type Long > and be unique for every vertex. If the user provides the same values for > every vertex (e.g. 1) the algorithm still works but as those values are used > for the identification of the different connected components, one will end up > with a single connected component and will have no clue as to why this > happened. This can be easily fixed in two ways: either by checking that the > values that appear alongside vertex-ids are unique and informing the user if > not, or by generating those values for every vertex before the algorithm is > ran. I have a running implementation of the second way and I really think it > is an appropriate solution to this problem. > 2. Once the connected components are identified, one has to apply additional > transformations and actions to find out which is the biggest or the order of > the connected components in terms of their size. Alternatively, the algorithm > can be implemented so that numerical ids that are given to each component > reflect their ranking when ordered based on size, e.g. connected component 1 > will be the biggest, connected component 2 should be the second biggest and > so on. I have also solved this and I think it would make a nice addition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader
[ https://issues.apache.org/jira/browse/FLINK-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-7490: Fix Version/s: 1.5.0 1.3.3 > UDF Agg throws Exception when flink-table is loaded with AppClassLoader > --- > > Key: FLINK-7490 > URL: https://issues.apache.org/jira/browse/FLINK-7490 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.4.0 >Reporter: Miguel Rui Pereira Marques >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0, 1.3.3, 1.5.0 > > > When a UDF aggregation for the Batch Table API is defined in the > FlinkUserCodeClassLoader and the Table API itself is loaded in the > AppClassLoader (the jar is included in the lib directory) this exception is > triggered: > {panel:title=Exception} > java.lang.Exception: The user defined 'open()' method caused an exception: > Table program cannot be compiled. This is a bug. Please file an issue. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35) > at > org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: > Cannot determine simple type name "org" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) > ... > {panel} > Upon inspecting the code I think this may be due to the usage of > 'getClass.getClassLoader' instead of > 'getRuntimeContext.getUserCodeClassLoader' as an argument 'compile' in the > method 'open' of class > org.apache.flink.table.runtime.aggregate.DataSetAggFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader
[ https://issues.apache.org/jira/browse/FLINK-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-7490. - Resolution: Fixed Fixed in 1.5: 59df4b75fa9c6754c89ea1922e05cfdb22e761da Fixed in 1.4: 084ff68d5434805d9fc4208fd52f04c2201e362c Fixed in 1.3: 4ca1b3e7b54fcff934ab4a33d59463e2ac4c975f > UDF Agg throws Exception when flink-table is loaded with AppClassLoader > --- > > Key: FLINK-7490 > URL: https://issues.apache.org/jira/browse/FLINK-7490 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.4.0 >Reporter: Miguel Rui Pereira Marques >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > When a UDF aggregation for the Batch Table API is defined in the > FlinkUserCodeClassLoader and the Table API itself is loaded in the > AppClassLoader (the jar is included in the lib directory) this exception is > triggered: > {panel:title=Exception} > java.lang.Exception: The user defined 'open()' method caused an exception: > Table program cannot be compiled. This is a bug. Please file an issue. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35) > at > org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: > Cannot determine simple type name "org" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) > ... > {panel} > Upon inspecting the code I think this may be due to the usage of > 'getClass.getClassLoader' instead of > 'getRuntimeContext.getUserCodeClassLoader' as an argument 'compile' in the > method 'open' of class > org.apache.flink.table.runtime.aggregate.DataSetAggFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule
[ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253814#comment-16253814 ] ASF GitHub Bot commented on FLINK-7942: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5019 Looks good. +1 to merge > NPE when apply FilterJoinRule > - > > Key: FLINK-7942 > URL: https://issues.apache.org/jira/browse/FLINK-7942 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: Timo Walther > > Test case *testFilterRule1* fails due to a NPE > {code} > java.lang.RuntimeException: Error while applying rule > FilterJoinRule:FilterJoinRule:filter, args > [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > > AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > $3), 'c0')), 'c1'), 0)), > rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, > $2),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347) > at > org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186) > testFilterRule1(FilterRuleTest.scala:63) > Caused by: java.lang.NullPointerException > at org.apache.calcite.plan.Strong.isNull(Strong.java:110) > at org.apache.calcite.plan.Strong.anyNull(Strong.java:166) > at org.apache.calcite.plan.Strong.isNull(Strong.java:114) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84) > at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354) > at > org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149) > at > org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > {code} > but *testFilterRule2* works which has the same query written in SQL. > {code} > class FilterRuleTest extends TableTestBase { > @Test > def testFilterRule1(): Unit = { > val util = batchTestUtil() > val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c) > val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f) > val results = t1 > .leftOuterJoin(t2, 'b === 'e) > .select('c, Merger('c, 'f) as 'c0) > .select(Merger('c, 'c0) as 'c1) > .where('c1 >= 0) > val expected = unaryNode( > "DataSetCalc", > binaryNode( > "DataSetJoin", > unaryNode( > "DataSetCalc", > batchTableNode(0), > term("select", "b", "c") > ), > unaryNode( > "DataSetCalc", > batchTableNode(1), > term("select", "e", "f") > ), > term("where", "=(b, e)"), > term("join", "b", "c", "e", "f"), > term("joinType", "LeftOuterJoin") > ), > term("select", "Merger$(c, Merger$(c, f)) AS c1"), > term("where", ">=(Merger$(c, Merger$(c, f)), 0)") > ) > util.verifyTable(results, expected) > } > @Test > def testFilterRule2(): Unit = { > val util = batchTestUtil() > util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c) > util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f) > util.tableEnv.registerFunction("udf_test", Merger) > val sql = > s""" > |select c1 > |from ( > | select udf_test(c, c0) as c1 > | from ( > |select c, udf_test(b, c) as c0 > | from > | (select a, b, c > |from T1 > |left outer join T2 > |on T1.b = T2.e > | ) tmp > | ) tmp1 > |) tmp2 > |where c1 >= 0 >""".stripMargin > val results = util.tableEnv.sqlQuery(sql) > val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) > \n" + > "DataSetJoin(where=[=(b, e)], join=[b, c, e], > joinType=[LeftOuterJoin])\n" + > "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), > 0)])\n" + > "DataSetScan(table=[[_DataSetTable_0]])\n" + > "DataSetCalc(select=[e])\n" + >
[GitHub] flink issue #5019: [FLINK-7942] [table] Reduce aliasing in RexNodes
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5019 Looks good. +1 to merge ---
[GitHub] flink pull request #5019: [FLINK-7942] [table] Reduce aliasing in RexNodes
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5019 ---
[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule
[ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253840#comment-16253840 ] ASF GitHub Bot commented on FLINK-7942: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5019 > NPE when apply FilterJoinRule > - > > Key: FLINK-7942 > URL: https://issues.apache.org/jira/browse/FLINK-7942 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: Timo Walther > > Test case *testFilterRule1* fails due to a NPE > {code} > java.lang.RuntimeException: Error while applying rule > FilterJoinRule:FilterJoinRule:filter, args > [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > > AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > $3), 'c0')), 'c1'), 0)), > rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, > $2),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347) > at > org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186) > testFilterRule1(FilterRuleTest.scala:63) > Caused by: java.lang.NullPointerException > at org.apache.calcite.plan.Strong.isNull(Strong.java:110) > at org.apache.calcite.plan.Strong.anyNull(Strong.java:166) > at org.apache.calcite.plan.Strong.isNull(Strong.java:114) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84) > at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354) > at > org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149) > at > org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > {code} > but *testFilterRule2* works which has the same query written in SQL. > {code} > class FilterRuleTest extends TableTestBase { > @Test > def testFilterRule1(): Unit = { > val util = batchTestUtil() > val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c) > val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f) > val results = t1 > .leftOuterJoin(t2, 'b === 'e) > .select('c, Merger('c, 'f) as 'c0) > .select(Merger('c, 'c0) as 'c1) > .where('c1 >= 0) > val expected = unaryNode( > "DataSetCalc", > binaryNode( > "DataSetJoin", > unaryNode( > "DataSetCalc", > batchTableNode(0), > term("select", "b", "c") > ), > unaryNode( > "DataSetCalc", > batchTableNode(1), > term("select", "e", "f") > ), > term("where", "=(b, e)"), > term("join", "b", "c", "e", "f"), > term("joinType", "LeftOuterJoin") > ), > term("select", "Merger$(c, Merger$(c, f)) AS c1"), > term("where", ">=(Merger$(c, Merger$(c, f)), 0)") > ) > util.verifyTable(results, expected) > } > @Test > def testFilterRule2(): Unit = { > val util = batchTestUtil() > util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c) > util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f) > util.tableEnv.registerFunction("udf_test", Merger) > val sql = > s""" > |select c1 > |from ( > | select udf_test(c, c0) as c1 > | from ( > |select c, udf_test(b, c) as c0 > | from > | (select a, b, c > |from T1 > |left outer join T2 > |on T1.b = T2.e > | ) tmp > | ) tmp1 > |) tmp2 > |where c1 >= 0 >""".stripMargin > val results = util.tableEnv.sqlQuery(sql) > val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) > \n" + > "DataSetJoin(where=[=(b, e)], join=[b, c, e], > joinType=[LeftOuterJoin])\n" + > "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), > 0)])\n" + > "DataSetScan(table=[[_DataSetTable_0]])\n" + > "DataSetCalc(select=[e])\n" + > "DataSetScan(table=[[_DataSetTable_1]])" >
[jira] [Resolved] (FLINK-7942) NPE when apply FilterJoinRule
[ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-7942. - Resolution: Fixed Fix Version/s: 1.5.0 1.4.0 Fixed in 1.5: b6a2dc345f37c4b643789e98d02e23a022d31415 Fixed in 1.4: 13962e1ffda62218031bf426ee9c06146c7c5573 > NPE when apply FilterJoinRule > - > > Key: FLINK-7942 > URL: https://issues.apache.org/jira/browse/FLINK-7942 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: Timo Walther > Fix For: 1.4.0, 1.5.0 > > > Test case *testFilterRule1* fails due to a NPE > {code} > java.lang.RuntimeException: Error while applying rule > FilterJoinRule:FilterJoinRule:filter, args > [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > > AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > $3), 'c0')), 'c1'), 0)), > rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, > $2),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347) > at > org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186) > testFilterRule1(FilterRuleTest.scala:63) > Caused by: java.lang.NullPointerException > at org.apache.calcite.plan.Strong.isNull(Strong.java:110) > at org.apache.calcite.plan.Strong.anyNull(Strong.java:166) > at org.apache.calcite.plan.Strong.isNull(Strong.java:114) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84) > at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354) > at > org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149) > at > org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > {code} > but *testFilterRule2* works which has the same query written in SQL. > {code} > class FilterRuleTest extends TableTestBase { > @Test > def testFilterRule1(): Unit = { > val util = batchTestUtil() > val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c) > val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f) > val results = t1 > .leftOuterJoin(t2, 'b === 'e) > .select('c, Merger('c, 'f) as 'c0) > .select(Merger('c, 'c0) as 'c1) > .where('c1 >= 0) > val expected = unaryNode( > "DataSetCalc", > binaryNode( > "DataSetJoin", > unaryNode( > "DataSetCalc", > batchTableNode(0), > term("select", "b", "c") > ), > unaryNode( > "DataSetCalc", > batchTableNode(1), > term("select", "e", "f") > ), > term("where", "=(b, e)"), > term("join", "b", "c", "e", "f"), > term("joinType", "LeftOuterJoin") > ), > term("select", "Merger$(c, Merger$(c, f)) AS c1"), > term("where", ">=(Merger$(c, Merger$(c, f)), 0)") > ) > util.verifyTable(results, expected) > } > @Test > def testFilterRule2(): Unit = { > val util = batchTestUtil() > util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c) > util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f) > util.tableEnv.registerFunction("udf_test", Merger) > val sql = > s""" > |select c1 > |from ( > | select udf_test(c, c0) as c1 > | from ( > |select c, udf_test(b, c) as c0 > | from > | (select a, b, c > |from T1 > |left outer join T2 > |on T1.b = T2.e > | ) tmp > | ) tmp1 > |) tmp2 > |where c1 >= 0 >""".stripMargin > val results = util.tableEnv.sqlQuery(sql) > val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) > \n" + > "DataSetJoin(where=[=(b, e)], join=[b, c, e], > joinType=[LeftOuterJoin])\n" + > "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), > 0)])\n" + > "DataSetScan(table=[[_DataSetTable_0]])\n" + >