[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253757#comment-16253757
 ] 

ASF GitHub Bot commented on FLINK-8063:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183131
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
+   Assert.assertTrue(throwable != null);
+   
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
--- End diff --

can you not check the class of the exception?


> Client blocks indefinitely when querying a non-existing state
> -
>
> Key: FLINK-8063
> URL: https://issues.apache.org/jira/browse/FLINK-8063
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under 
> queryableStateName) the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183957
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -439,6 +443,85 @@ public Integer getKey(Tuple2 value) 
throws Exception {
}
 
/**
+* Tests that the correct exception is thrown if the query
+* contains a wrong queryable state name.
+*/
+   @Test
+   public void testWrongQueryableStateName() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final long numElements = 1024L;
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStateBackend(stateBackend);
+   env.setParallelism(maxParallelism);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+   // submitting.
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+   DataStream> source = env
+   .addSource(new 
TestAscendingValueSource(numElements));
+
+   // Value state
+   ValueStateDescriptor> valueState =
+   new ValueStateDescriptor<>("any", 
source.getType());
+
+   source.keyBy(new KeySelector, 
Integer>() {
+   private static final long serialVersionUID = 
7662520075515707428L;
+
+   @Override
+   public Integer getKey(Tuple2 
value) throws Exception {
+   return value.f0;
+   }
+   }).asQueryableState("hakuna", valueState);
+
+   // Submit the job graph
+   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+   jobId = jobGraph.getJobID();
+
+   cluster.submitJobDetached(jobGraph);
+
+   // wait until the job is running before starting to 
query.
+   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
+   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+   
.mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+   CompletableFuture>> 
future = client.getKvState(
+   jobId,
+   "wrong-hankuna", // this is the wrong 
name.
+   0,
+   VoidNamespace.INSTANCE,
+   BasicTypeInfo.INT_TYPE_INFO,
+   VoidNamespaceTypeInfo.INSTANCE,
+   valueState);
+
+   final CompletableFuture completion = new 
CompletableFuture<>();
+   future.whenComplete((result, throwable) -> {
+   Assert.assertTrue(throwable != null);
+   
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
--- End diff --

No because it is already "string-ified".


---


[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253765#comment-16253765
 ] 

ASF GitHub Bot commented on FLINK-8063:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151183695
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

No, an `UnknownKvStateIdException` is thrown in this case. See 
`KvStateServerHandler`.


> Client blocks indefinitely when querying a non-existing state
> -
>
> Key: FLINK-8063
> URL: https://issues.apache.org/jira/browse/FLINK-8063
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under 
> queryableStateName) the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253770#comment-16253770
 ] 

ASF GitHub Bot commented on FLINK-8063:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151184384
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -133,12 +131,11 @@ private void executeActionAsync(
operationFuture.whenCompleteAsync(
(t, throwable) -> {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||
--- End diff --

The `UnknownKvStateLocationException` is thrown in the case we do not have 
a server for a particular `keyGroup`.


> Client blocks indefinitely when querying a non-existing state
> -
>
> Key: FLINK-8063
> URL: https://issues.apache.org/jira/browse/FLINK-8063
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under 
> queryableStateName) the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253781#comment-16253781
 ] 

ASF GitHub Bot commented on FLINK-8063:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5021#discussion_r151184658
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
@@ -1372,84 +1492,60 @@ public String fold(String accumulator, 
Tuple2 value) throws Excep
 
/   General Utility Methods 
//
 
-   private static  CompletableFuture 
getKvStateWithRetries(
+   private static  CompletableFuture getKvState(
final QueryableStateClient client,
final JobID jobId,
final String queryName,
final K key,
final TypeInformation keyTypeInfo,
final StateDescriptor stateDescriptor,
-   final Time retryDelay,
final boolean failForUnknownKeyOrNamespace,
-   final ScheduledExecutor executor) {
-   return retryWithDelay(
-   () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor),
-   NO_OF_RETRIES,
-   retryDelay,
-   executor,
-   failForUnknownKeyOrNamespace);
-   }
-
-   private static  CompletableFuture retryWithDelay(
-   final Supplier operation,
-   final int retries,
-   final Time retryDelay,
-   final ScheduledExecutor scheduledExecutor,
-   final boolean failIfUnknownKeyOrNamespace) {
-
-   final CompletableFuture resultFuture = new 
CompletableFuture<>();
-
-   retryWithDelay(
-   resultFuture,
-   operation,
-   retries,
-   retryDelay,
-   scheduledExecutor,
-   failIfUnknownKeyOrNamespace);
+   final ScheduledExecutor executor) throws 
InterruptedException {
 
+   final CompletableFuture resultFuture = new 
CompletableFuture<>();
+   getKvStateIgnoringCertainExceptions(
+   resultFuture, client, jobId, queryName, key, 
keyTypeInfo,
+   stateDescriptor, failForUnknownKeyOrNamespace, 
executor);
return resultFuture;
}
 
-   public static  void retryWithDelay(
-   final CompletableFuture resultFuture,
-   final Supplier operation,
-   final int retries,
-   final Time retryDelay,
-   final ScheduledExecutor scheduledExecutor,
-   final boolean failIfUnknownKeyOrNamespace) {
+   private static  void 
getKvStateIgnoringCertainExceptions(
+   final CompletableFuture resultFuture,
+   final QueryableStateClient client,
+   final JobID jobId,
+   final String queryName,
+   final K key,
+   final TypeInformation keyTypeInfo,
+   final StateDescriptor stateDescriptor,
+   final boolean failForUnknownKeyOrNamespace,
+   final ScheduledExecutor executor) throws 
InterruptedException {
 
if (!resultFuture.isDone()) {
-   final CompletableFuture operationResultFuture = 
operation.get();
-   operationResultFuture.whenCompleteAsync(
-   (t, throwable) -> {
-   if (throwable != null) {
-   if 
(throwable.getCause() instanceof CancellationException) {
-   
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation 
future was cancelled.", throwable.getCause()));
-   } else if 
(throwable.getCause() instanceof AssertionError ||
-   
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof 

[jira] [Commented] (FLINK-8041) Recommended Improvements for Gelly's Connected Components Algorithm

2017-11-15 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253784#comment-16253784
 ] 

Greg Hogan commented on FLINK-8041:
---

Currently the vertex value can be any implementation of {{Comparable}}, not 
just {{Long}}. I can see the benefit to assigning vertices a label as with 
{{DataSetUtils#zipWithUniqueId}}. There looks to be a cost or limit on 
scalability with the second proposal, but it would be great to start a 
discussion on a PR.

> Recommended Improvements for Gelly's Connected Components Algorithm
> ---
>
> Key: FLINK-8041
> URL: https://issues.apache.org/jira/browse/FLINK-8041
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.3.2
> Environment: Linux, IntelliJ IDEA
>Reporter: Christos Hadjinikolis
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> At the moment, the ConnectedComponents algorithm that comes with Flink's 
> native Graph API (Gelly) has two issues:
> 1. It relies on the user to provide correct values for in the vertices 
> DataSet. Based on how the algorithm works, these values must be of type Long 
> and be unique for every vertex. If the user provides the same values for 
> every vertex (e.g. 1) the algorithm still works but as those values are used 
> for the identification of the different connected components, one will end up 
> with a single connected component and will have no clue as to why this 
> happened. This can be easily fixed in two ways: either by checking that the 
> values that appear alongside vertex-ids are unique and informing the user if 
> not, or by generating those values for every vertex before the algorithm is 
> ran. I have a running implementation of the second way and I really think it 
> is an appropriate solution to this problem.
> 2. Once the connected components are identified, one has to apply additional 
> transformations and actions to find out which is the biggest or the order of 
> the connected components in terms of their size. Alternatively, the algorithm 
> can be implemented so that numerical ids that are given to each component 
> reflect their ranking when ordered based on size, e.g. connected component 1 
> will be the biggest, connected component 2 should be the second biggest and 
> so on.  I have also solved this and I think it would make a nice addition. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader

2017-11-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-7490:

Fix Version/s: 1.5.0
   1.3.3

> UDF Agg throws Exception when flink-table is loaded with AppClassLoader
> ---
>
> Key: FLINK-7490
> URL: https://issues.apache.org/jira/browse/FLINK-7490
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Miguel Rui Pereira Marques
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3, 1.5.0
>
>
> When a UDF aggregation for the Batch Table API is defined in the 
> FlinkUserCodeClassLoader and the Table API itself is loaded in the 
> AppClassLoader (the jar is included in the lib directory) this exception is 
> triggered:
> {panel:title=Exception}
> java.lang.Exception: The user defined 'open()' method caused an exception: 
> Table program cannot be compiled. This is a bug. Please file an issue.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35)
>   at 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>   ... 3 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: 
> Cannot determine simple type name "org"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
> ...
> {panel}
> Upon inspecting the code I think this may be due to the usage of 
> 'getClass.getClassLoader' instead of 
> 'getRuntimeContext.getUserCodeClassLoader' as an argument 'compile' in the 
> method 'open' of class 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader

2017-11-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7490.
-
Resolution: Fixed

Fixed in 1.5: 59df4b75fa9c6754c89ea1922e05cfdb22e761da
Fixed in 1.4: 084ff68d5434805d9fc4208fd52f04c2201e362c
Fixed in 1.3: 4ca1b3e7b54fcff934ab4a33d59463e2ac4c975f

> UDF Agg throws Exception when flink-table is loaded with AppClassLoader
> ---
>
> Key: FLINK-7490
> URL: https://issues.apache.org/jira/browse/FLINK-7490
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Miguel Rui Pereira Marques
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.4.0
>
>
> When a UDF aggregation for the Batch Table API is defined in the 
> FlinkUserCodeClassLoader and the Table API itself is loaded in the 
> AppClassLoader (the jar is included in the lib directory) this exception is 
> triggered:
> {panel:title=Exception}
> java.lang.Exception: The user defined 'open()' method caused an exception: 
> Table program cannot be compiled. This is a bug. Please file an issue.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35)
>   at 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>   ... 3 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: 
> Cannot determine simple type name "org"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
> ...
> {panel}
> Upon inspecting the code I think this may be due to the usage of 
> 'getClass.getClassLoader' instead of 
> 'getRuntimeContext.getUserCodeClassLoader' as an argument 'compile' in the 
> method 'open' of class 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253814#comment-16253814
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5019
  
Looks good. +1 to merge


> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   

[GitHub] flink issue #5019: [FLINK-7942] [table] Reduce aliasing in RexNodes

2017-11-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5019
  
Looks good. +1 to merge


---


[GitHub] flink pull request #5019: [FLINK-7942] [table] Reduce aliasing in RexNodes

2017-11-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5019


---


[jira] [Commented] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16253840#comment-16253840
 ] 

ASF GitHub Bot commented on FLINK-7942:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5019


> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   "DataSetScan(table=[[_DataSetTable_1]])"
> 

[jira] [Resolved] (FLINK-7942) NPE when apply FilterJoinRule

2017-11-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7942.
-
   Resolution: Fixed
Fix Version/s: 1.5.0
   1.4.0

Fixed in 1.5: b6a2dc345f37c4b643789e98d02e23a022d31415
Fixed in 1.4: 13962e1ffda62218031bf426ee9c06146c7c5573

> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
> Fix For: 1.4.0, 1.5.0
>
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   

<    1   2   3