[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown

2018-04-30 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459439#comment-16459439
 ] 

Gary Yao commented on FLINK-9196:
-

[~yuqi] Your patch would not work if the application is submitted in detached 
mode. When working on tickets, please assign the tickets to yourself in future.

> YARN: Flink binaries are not deleted from HDFS after cluster shutdown
> -
>
> Key: FLINK-9196
> URL: https://issues.apache.org/jira/browse/FLINK-9196
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: 0001-xxx.patch
>
>
> When deploying on YARN in flip6 mode, the Flink binaries are not deleted from 
> HDFS after the cluster shuts down.
> *Steps to reproduce*
> # Submit job in YARN job mode, non-detached:
> {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat}
> # Check contents of {{/user/hadoop/.flink/}} on HDFS after 
> job is finished:
> {noformat}
> [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls 
> /user/hadoop/.flink/application_1523966184826_0016
> Found 6 items
> -rw-r--r--   1 hadoop hadoop583 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml
> -rw-r--r--   1 hadoop hadoop332 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp
> -rw-r--r--   1 hadoop hadoop   89779342 2018-04-02 17:08 
> /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar
> drwxrwxrwx   - hadoop hadoop  0 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/lib
> -rw-r--r--   1 hadoop hadoop   1939 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/log4j.properties
> -rw-r--r--   1 hadoop hadoop   2331 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/logback.xml
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459436#comment-16459436
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185163799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -109,7 +119,11 @@ public MiniDispatcher(
 
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
-   jobResultFuture.whenComplete((JobResult ignored, 
Throwable throwable) -> shutDown());
+   jobResultFuture.whenComplete((JobResult result, 
Throwable throwable) -> {
--- End diff --

`throwable` isn't used. If `jobResultFuture` cannot be completed 
exceptionally, `thenAccept` should be used. 


> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - 

[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459437#comment-16459437
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185164034
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -109,7 +119,11 @@ public MiniDispatcher(
 
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
-   jobResultFuture.whenComplete((JobResult ignored, 
Throwable throwable) -> shutDown());
+   jobResultFuture.whenComplete((JobResult result, 
Throwable throwable) -> {
+   ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
+   ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;
+   jobTerminationFuture.complete(status);
--- End diff --

I think the functional way would be:

```

jobTerminationFuture.complete(result.getSerializedThrowable()
.map(serializedThrowable -> 
ApplicationStatus.FAILED)
.orElse(ApplicationStatus.SUCCEEDED));
```


> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: 

[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459435#comment-16459435
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185163871
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 ---
@@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExc
}
}
 
+   @Override
+   protected void 
registerShutdownActions(CompletableFuture terminationFuture) 
{
+   terminationFuture.whenComplete((status, throwable) ->
--- End diff --

`throwable` isn't used. If `terminationFuture` cannot be completed 
exceptionally, `thenAccept` should be used. 


> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:46511
> 2018-03-08 16:48:39,975 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache   

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-04-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185164034
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -109,7 +119,11 @@ public MiniDispatcher(
 
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
-   jobResultFuture.whenComplete((JobResult ignored, 
Throwable throwable) -> shutDown());
+   jobResultFuture.whenComplete((JobResult result, 
Throwable throwable) -> {
+   ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
+   ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;
+   jobTerminationFuture.complete(status);
--- End diff --

I think the functional way would be:

```

jobTerminationFuture.complete(result.getSerializedThrowable()
.map(serializedThrowable -> 
ApplicationStatus.FAILED)
.orElse(ApplicationStatus.SUCCEEDED));
```


---


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-04-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185163871
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 ---
@@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExc
}
}
 
+   @Override
+   protected void 
registerShutdownActions(CompletableFuture terminationFuture) 
{
+   terminationFuture.whenComplete((status, throwable) ->
--- End diff --

`throwable` isn't used. If `terminationFuture` cannot be completed 
exceptionally, `thenAccept` should be used. 


---


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-04-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185163799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -109,7 +119,11 @@ public MiniDispatcher(
 
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
-   jobResultFuture.whenComplete((JobResult ignored, 
Throwable throwable) -> shutDown());
+   jobResultFuture.whenComplete((JobResult result, 
Throwable throwable) -> {
--- End diff --

`throwable` isn't used. If `jobResultFuture` cannot be completed 
exceptionally, `thenAccept` should be used. 


---


[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2018-04-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7775:
--
Description: 
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}
The method of PermanentBlobCache is not used.
We should remove it.

  was:
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}

The method of PermanentBlobCache is not used.
We should remove it.


> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method of PermanentBlobCache is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-04-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9091:
--
Description: 
Got the following when building flink-json module:

{code}
[WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
failed with message:
Failed while enforcing releasability. See above detailed error message.
...
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
(dependency-convergence) on project flink-json: Some Enforcer rules have 
failed.   Look above for specific messages explaining why the rule failed. -> 
[Help 1]
{code}

  was:
Got the following when building flink-json module:
{code}
[WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
failed with message:
Failed while enforcing releasability. See above detailed error message.
...
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
(dependency-convergence) on project flink-json: Some Enforcer rules have 
failed.   Look above for specific messages explaining why the rule failed. -> 
[Help 1]
{code}


> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459237#comment-16459237
 ] 

ASF GitHub Bot commented on FLINK-8286:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185135031
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185134529
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Good point. Added exceptions to method signature and let caller handle it.


---


[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459238#comment-16459238
 ] 

ASF GitHub Bot commented on FLINK-8286:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185134529
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Good point. Added exceptions to method signature and let caller handle it.


> Fix Flink-Yarn-Kerberos integration for FLIP-6
> --
>
> Key: FLINK-8286
> URL: https://issues.apache.org/jira/browse/FLINK-8286
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The current Flink-Yarn-Kerberos in Flip-6 is broken. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185135031
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath 

[jira] [Commented] (FLINK-9256) NPE in SingleInputGate#updateInputChannel() for non-credit based flow control

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459079#comment-16459079
 ] 

ASF GitHub Bot commented on FLINK-9256:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5914


> NPE in SingleInputGate#updateInputChannel() for non-credit based flow control
> -
>
> Key: FLINK-9256
> URL: https://issues.apache.org/jira/browse/FLINK-9256
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{SingleInputGate#updateInputChannel()}} fails to update remote partitions 
> without credit based flow control due to a {{NullPointerException}} from 
> {{networkBufferPool == null}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9214) YarnClient should be stopped in YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459076#comment-16459076
 ] 

ASF GitHub Bot commented on FLINK-9214:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5892


> YarnClient should be stopped in 
> YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal
> 
>
> Key: FLINK-9214
> URL: https://issues.apache.org/jira/browse/FLINK-9214
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal 
> creates YarnClient without stopping it at the end of the test.
> YarnClient yc should be stopped before returning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9274) Add thread name to Kafka Partition Discovery

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459080#comment-16459080
 ] 

ASF GitHub Bot commented on FLINK-9274:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5942


> Add thread name to Kafka Partition Discovery
> 
>
> Key: FLINK-9274
> URL: https://issues.apache.org/jira/browse/FLINK-9274
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For debugging, threads should have names to filter on and get a quick 
> overview. The Kafka partition discovery thread(s) currently don't have any 
> name assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459077#comment-16459077
 ] 

ASF GitHub Bot commented on FLINK-9196:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5938


> YARN: Flink binaries are not deleted from HDFS after cluster shutdown
> -
>
> Key: FLINK-9196
> URL: https://issues.apache.org/jira/browse/FLINK-9196
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: 0001-xxx.patch
>
>
> When deploying on YARN in flip6 mode, the Flink binaries are not deleted from 
> HDFS after the cluster shuts down.
> *Steps to reproduce*
> # Submit job in YARN job mode, non-detached:
> {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat}
> # Check contents of {{/user/hadoop/.flink/}} on HDFS after 
> job is finished:
> {noformat}
> [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls 
> /user/hadoop/.flink/application_1523966184826_0016
> Found 6 items
> -rw-r--r--   1 hadoop hadoop583 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml
> -rw-r--r--   1 hadoop hadoop332 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp
> -rw-r--r--   1 hadoop hadoop   89779342 2018-04-02 17:08 
> /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar
> drwxrwxrwx   - hadoop hadoop  0 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/lib
> -rw-r--r--   1 hadoop hadoop   1939 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/log4j.properties
> -rw-r--r--   1 hadoop hadoop   2331 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/logback.xml
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9275) Set more distinctive output flusher thread names

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459078#comment-16459078
 ] 

ASF GitHub Bot commented on FLINK-9275:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5943


> Set more distinctive output flusher thread names
> 
>
> Key: FLINK-9275
> URL: https://issues.apache.org/jira/browse/FLINK-9275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> All output flusher threads are named "OutputFlusher" while at the only place 
> the {{StreamWriter}} is initialized, we already have the task name at hand.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5914: [FLINK-9256][network] fix NPE in SingleInputGate#u...

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5914


---


[GitHub] flink pull request #5916: [hotfix][tests] remove redundant rebalance in Succ...

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5916


---


[GitHub] flink pull request #5938: [FLINK-9196][flip6, yarn] Cleanup application file...

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5938


---


[GitHub] flink pull request #5892: [FLINK-9214] YarnClient should be stopped in YARNS...

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5892


---


[GitHub] flink pull request #5942: [FLINK-9274][kafka] add thread name for partition ...

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5942


---


[GitHub] flink pull request #5924: [hotfix][README.md] Update building prerequisites

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5924


---


[GitHub] flink pull request #5943: [FLINK-9275][streaming] add taskName to the output...

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5943


---


[jira] [Closed] (FLINK-9279) PythonPlanBinderTest flakey

2018-04-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9279.
---
Resolution: Duplicate

> PythonPlanBinderTest flakey
> ---
>
> Key: FLINK-9279
> URL: https://issues.apache.org/jira/browse/FLINK-9279
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Priority: Critical
>
> The test fails while trying to create the parent directory {{/tmp/flink}}. 
> That happens if a file with that name already exists.
> The Python Plan binder apparently used a fix name for the temp directory, but 
> should use a statistically unique random name instead.
> Full test run log: https://api.travis-ci.org/v3/job/373120733/log.txt
> Relevant Stack Trace
> {code}
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:898)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Mkdirs failed to create /tmp/flink
>   at 
> org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
>   at 
> org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:161)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 59.839 sec 
> <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest
> testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest)  
> Time elapsed: 14.912 sec  <<< FAILURE!
> java.lang.AssertionError: Error while calling the test program: Job execution 
> failed.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:161)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at 

[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459004#comment-16459004
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5944
  
I will try it out.


> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:46511
> 2018-03-08 16:48:39,975 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...

2018-04-30 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5944
  
I will try it out.


---


[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458986#comment-16458986
 ] 

ASF GitHub Bot commented on FLINK-9222:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5900
  
I'm pretty sure I got all this covered by using a separate "configuration": 
`myShadowJar` - not the standard dependency configuration*, but does the trick:
Everything added to `myShadowJar` inside the `dependencies` gets added to 
the shaded jar (here: shadowJar), except for the explicit excludes which work 
with transitive dependencies as well (defined in the lines starting with 
`myShadowJar.exclude group:` where I included the same things as in the maven 
shade configuration of the quickstart). All user-code dependencies should be 
put into `myShadowJar` - maybe I should make this even more explicit in the 
gradle build file.

- nothing is relocated - there's stuff packed into the jar and other stuff 
that isn't, that's it :) (should be the same - I did compare the jar with the 
one from maven with and without the kafka connector dependency as a test)

- Flink core dependencies are excluded from the uber jar by not putting 
them into `myShadowJar`

- with the trick of using `myShadowJar`, not only is IntelliJ able to run 
the job and the tests, it also runs from commandline

* Unfortunately, I could not use the `shadow`/`compileOnly` dependency 
configurations which are standard for this in gradle because then the program 
would not run in IntelliJ or via `gradle run`. It would expect the environment 
to provide the dependencies which it does not there. Alternatives/fixes for 
this broke the transitive dependency exclusion which is, however, scheduled for 
some future version of the gradle shadow plugin. There a lot of enhancement 
requests in this regard, e.g. https://github.com/johnrengelman/shadow/issues/159


> Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website, Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but there is none for Gradle 
> and Gradle users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5900: [FLINK-9222][docs] add documentation for setting up Gradl...

2018-04-30 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5900
  
I'm pretty sure I got all this covered by using a separate "configuration": 
`myShadowJar` - not the standard dependency configuration*, but does the trick:
Everything added to `myShadowJar` inside the `dependencies` gets added to 
the shaded jar (here: shadowJar), except for the explicit excludes which work 
with transitive dependencies as well (defined in the lines starting with 
`myShadowJar.exclude group:` where I included the same things as in the maven 
shade configuration of the quickstart). All user-code dependencies should be 
put into `myShadowJar` - maybe I should make this even more explicit in the 
gradle build file.

- nothing is relocated - there's stuff packed into the jar and other stuff 
that isn't, that's it :) (should be the same - I did compare the jar with the 
one from maven with and without the kafka connector dependency as a test)

- Flink core dependencies are excluded from the uber jar by not putting 
them into `myShadowJar`

- with the trick of using `myShadowJar`, not only is IntelliJ able to run 
the job and the tests, it also runs from commandline

* Unfortunately, I could not use the `shadow`/`compileOnly` dependency 
configurations which are standard for this in gradle because then the program 
would not run in IntelliJ or via `gradle run`. It would expect the environment 
to provide the dependencies which it does not there. Alternatives/fixes for 
this broke the transitive dependency exclusion which is, however, scheduled for 
some future version of the gradle shadow plugin. There a lot of enhancement 
requests in this regard, e.g. https://github.com/johnrengelman/shadow/issues/159


---


[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458960#comment-16458960
 ] 

ASF GitHub Bot commented on FLINK-9222:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5900
  
Nice addition!

A few things I would like to double check on the quickstart configuration 
(I am not fluent enough Gradle):

  - We do not need to hide/shade any dependencies in the user code. In 
Maven, we use the shade plugin, but only to build an uber jar, not to actually 
relocate dependencies. Is that the same in the Gradle quickstart?

  - The Flink core dependencies need to be in a scope equivalent to 
"provided", so they do not end up in the uber jar. Can we do something similar 
in Gradle? This has been a frequent source of unnecessarily bloated application 
jars.

  - The Maven quickstart template uses a trick to make sure that the 
provided dependencies are still in the classpath when we run the program in the 
IDE: A profile that activates in IDEA (by a property variable) and alters the 
scope from *provided* to *compile*. Not sure if that is strictly necessary, but 
may be helpful.


> Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website, Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but there is none for Gradle 
> and Gradle users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5900: [FLINK-9222][docs] add documentation for setting up Gradl...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5900
  
Nice addition!

A few things I would like to double check on the quickstart configuration 
(I am not fluent enough Gradle):

  - We do not need to hide/shade any dependencies in the user code. In 
Maven, we use the shade plugin, but only to build an uber jar, not to actually 
relocate dependencies. Is that the same in the Gradle quickstart?

  - The Flink core dependencies need to be in a scope equivalent to 
"provided", so they do not end up in the uber jar. Can we do something similar 
in Gradle? This has been a frequent source of unnecessarily bloated application 
jars.

  - The Maven quickstart template uses a trick to make sure that the 
provided dependencies are still in the classpath when we run the program in the 
IDE: A profile that activates in IDEA (by a property variable) and alters the 
scope from *provided* to *compile*. Not sure if that is strictly necessary, but 
may be helpful.


---


[jira] [Commented] (FLINK-9256) NPE in SingleInputGate#updateInputChannel() for non-credit based flow control

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458923#comment-16458923
 ] 

ASF GitHub Bot commented on FLINK-9256:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5914
  
Change looks very good, thanks!
Merging this...

We can probably remove most of this code out again later, once we drop the 
non-credit-based code paths in the next releases. But that still makes this a 
necessary fix for now...


> NPE in SingleInputGate#updateInputChannel() for non-credit based flow control
> -
>
> Key: FLINK-9256
> URL: https://issues.apache.org/jira/browse/FLINK-9256
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{SingleInputGate#updateInputChannel()}} fails to update remote partitions 
> without credit based flow control due to a {{NullPointerException}} from 
> {{networkBufferPool == null}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5914: [FLINK-9256][network] fix NPE in SingleInputGate#updateIn...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5914
  
Change looks very good, thanks!
Merging this...

We can probably remove most of this code out again later, once we drop the 
non-credit-based code paths in the next releases. But that still makes this a 
necessary fix for now...


---


[GitHub] flink issue #5916: [hotfix][tests] remove redundant rebalance in SuccessAfte...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5916
  
Merging...


---


[jira] [Commented] (FLINK-9274) Add thread name to Kafka Partition Discovery

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458916#comment-16458916
 ] 

ASF GitHub Bot commented on FLINK-9274:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5942
  
Good fix, thanks!

Merging...


> Add thread name to Kafka Partition Discovery
> 
>
> Key: FLINK-9274
> URL: https://issues.apache.org/jira/browse/FLINK-9274
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For debugging, threads should have names to filter on and get a quick 
> overview. The Kafka partition discovery thread(s) currently don't have any 
> name assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5942: [FLINK-9274][kafka] add thread name for partition discove...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5942
  
Good fix, thanks!

Merging...


---


[jira] [Commented] (FLINK-9275) Set more distinctive output flusher thread names

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458913#comment-16458913
 ] 

ASF GitHub Bot commented on FLINK-9275:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5943
  
Good fix, thanks, merging...


> Set more distinctive output flusher thread names
> 
>
> Key: FLINK-9275
> URL: https://issues.apache.org/jira/browse/FLINK-9275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> All output flusher threads are named "OutputFlusher" while at the only place 
> the {{StreamWriter}} is initialized, we already have the task name at hand.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5943: [FLINK-9275][streaming] add taskName to the output flushe...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5943
  
Good fix, thanks, merging...


---


[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458908#comment-16458908
 ] 

ASF GitHub Bot commented on FLINK-9196:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5938
  
Merging this...


> YARN: Flink binaries are not deleted from HDFS after cluster shutdown
> -
>
> Key: FLINK-9196
> URL: https://issues.apache.org/jira/browse/FLINK-9196
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: 0001-xxx.patch
>
>
> When deploying on YARN in flip6 mode, the Flink binaries are not deleted from 
> HDFS after the cluster shuts down.
> *Steps to reproduce*
> # Submit job in YARN job mode, non-detached:
> {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat}
> # Check contents of {{/user/hadoop/.flink/}} on HDFS after 
> job is finished:
> {noformat}
> [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls 
> /user/hadoop/.flink/application_1523966184826_0016
> Found 6 items
> -rw-r--r--   1 hadoop hadoop583 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml
> -rw-r--r--   1 hadoop hadoop332 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp
> -rw-r--r--   1 hadoop hadoop   89779342 2018-04-02 17:08 
> /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar
> drwxrwxrwx   - hadoop hadoop  0 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/lib
> -rw-r--r--   1 hadoop hadoop   1939 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/log4j.properties
> -rw-r--r--   1 hadoop hadoop   2331 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/logback.xml
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458906#comment-16458906
 ] 

ASF GitHub Bot commented on FLINK-9196:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5938#discussion_r185080370
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -570,6 +571,21 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
});
}
 
+   @Override
+   public void shutDownCluster() {
+   try {
+   sendRetryableRequest(
+   ShutdownHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   EmptyRequestBody.getInstance(),
+   isConnectionProblemException()).get();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Error while shutting down cluster", e);
--- End diff --

Throw the cause of the `ExecutionException`?


> YARN: Flink binaries are not deleted from HDFS after cluster shutdown
> -
>
> Key: FLINK-9196
> URL: https://issues.apache.org/jira/browse/FLINK-9196
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: 0001-xxx.patch
>
>
> When deploying on YARN in flip6 mode, the Flink binaries are not deleted from 
> HDFS after the cluster shuts down.
> *Steps to reproduce*
> # Submit job in YARN job mode, non-detached:
> {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster 
> -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat}
> # Check contents of {{/user/hadoop/.flink/}} on HDFS after 
> job is finished:
> {noformat}
> [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls 
> /user/hadoop/.flink/application_1523966184826_0016
> Found 6 items
> -rw-r--r--   1 hadoop hadoop583 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml
> -rw-r--r--   1 hadoop hadoop332 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp
> -rw-r--r--   1 hadoop hadoop   89779342 2018-04-02 17:08 
> /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar
> drwxrwxrwx   - hadoop hadoop  0 2018-04-17 14:54 
> /user/hadoop/.flink/application_1523966184826_0016/lib
> -rw-r--r--   1 hadoop hadoop   1939 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/log4j.properties
> -rw-r--r--   1 hadoop hadoop   2331 2018-04-02 15:37 
> /user/hadoop/.flink/application_1523966184826_0016/logback.xml
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5938: [FLINK-9196][flip6, yarn] Cleanup application files when ...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5938
  
Merging this...


---


[GitHub] flink pull request #5938: [FLINK-9196][flip6, yarn] Cleanup application file...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5938#discussion_r185080370
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -570,6 +571,21 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
});
}
 
+   @Override
+   public void shutDownCluster() {
+   try {
+   sendRetryableRequest(
+   ShutdownHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   EmptyRequestBody.getInstance(),
+   isConnectionProblemException()).get();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Error while shutting down cluster", e);
--- End diff --

Throw the cause of the `ExecutionException`?


---


[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458893#comment-16458893
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5944
  
The test failure is unrelated - unrelated test flakeyness 


> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:46511
> 2018-03-08 16:48:39,975 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5944
  
The test failure is unrelated - unrelated test flakeyness 


---


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5931
  
 @GJL Briefly digging through the log, there are a few strange things 
happening:

  -  `YarnResourceManager` still has 8 pending requests even when 11 
containers are running:
```Received new container: container_1524853016208_0001_01_000184 - 
Remaining pending container requests: 8```

  - Some slots are requested and then the requests are cancelled again
  - In the end, one request is not fulfilled: 
`aeec2a9f010a187e04e31e6efd6f0f88`

Might be an inconsistency in either in the `SlotManager` or `SlotPool`.


---


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458891#comment-16458891
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5931
  
 @GJL Briefly digging through the log, there are a few strange things 
happening:

  -  `YarnResourceManager` still has 8 pending requests even when 11 
containers are running:
```Received new container: container_1524853016208_0001_01_000184 - 
Remaining pending container requests: 8```

  - Some slots are requested and then the requests are cancelled again
  - In the end, one request is not fulfilled: 
`aeec2a9f010a187e04e31e6efd6f0f88`

Might be an inconsistency in either in the `SlotManager` or `SlotPool`.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9279) PythonPlanBinderTest flakey

2018-04-30 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9279:
---

 Summary: PythonPlanBinderTest flakey
 Key: FLINK-9279
 URL: https://issues.apache.org/jira/browse/FLINK-9279
 Project: Flink
  Issue Type: Bug
  Components: Python API, Tests
Affects Versions: 1.5.0
Reporter: Stephan Ewen


The test fails while trying to create the parent directory {{/tmp/flink}}. That 
happens if a file with that name already exists.

The Python Plan binder apparently used a fix name for the temp directory, but 
should use a statistically unique random name instead.

Full test run log: https://api.travis-ci.org/v3/job/373120733/log.txt

Relevant Stack Trace
{code}
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:898)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Mkdirs failed to create /tmp/flink
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
at 
org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:161)
at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 59.839 sec <<< 
FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest
testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest)  
Time elapsed: 14.912 sec  <<< FAILURE!
java.lang.AssertionError: Error while calling the test program: Job execution 
failed.
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:161)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 

[jira] [Created] (FLINK-9278) Allow restore savepoint with some SQL queries added/removed

2018-04-30 Thread Adrian Hains (JIRA)
Adrian Hains created FLINK-9278:
---

 Summary: Allow restore savepoint with some SQL queries 
added/removed
 Key: FLINK-9278
 URL: https://issues.apache.org/jira/browse/FLINK-9278
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Adrian Hains


We are running a Flink job that contains multiple SQL queries. This is 
configured by calling sqlQuery(String) one time for each SQL query, on a single 
instance of StreamTableEnvironment. The queries are simple aggregations with a 
tumble window.

Currently I can configure my environment with queries Q1, Q2, and Q3, create a 
savepoint, and restart the job from that savepoint if the same set of SQL 
queries are used.

If I remove some queries and add some others, Q2, Q4, and Q3, I am unable to 
restart the job from the same savepoint. This behavior is expected, as the 
documentation clearly describes that the operator IDs are generated if they are 
not explicitly defined, and they cannot be explicitly defined when using flink 
SQL.

I would like to be able to specify a scoping operator id prefix when 
registering a SQL query to a StreamTableEnvironment. This can then be used to 
programmatically generate unique IDs for each of the operators created to 
execute the SQL queries. For example, if I specify a prefix of "ID:Q2:" for my 
Q2 query, and I restart the job with an identical SQL query for this prefix, 
then I would be able to restore the state for this query even in the presence 
of other queries being added or removed to the job graph.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9277) Reduce noisiness of SlotPool logging

2018-04-30 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458828#comment-16458828
 ] 

Chesnay Schepler commented on FLINK-9277:
-

Kind of a duplicate of FLINK-9215. In this 
[PR|https://github.com/apache/flink/pull/5879] we've been thinking of adding a 
separate exception class used for the expected life-cycle of resources, which 
doesn't print a stacktrace.

> Reduce noisiness of SlotPool logging
> 
>
> Key: FLINK-9277
> URL: https://issues.apache.org/jira/browse/FLINK-9277
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
>
> The slot pool logs a vary large amount of stack traces with meaningless 
> exceptions like {code}
> org.apache.flink.util.FlinkException: Release multi task slot because all 
> children have been released.
> {code}
> This makes log parsing very hard.
> For an example, see this log: 
> https://gist.githubusercontent.com/GJL/3b109db48734ff40103f47d04fc54bd3/raw/e3afc0ec3f452bad681e388016bcf799bba56f10/gistfile1.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458785#comment-16458785
 ] 

ASF GitHub Bot commented on FLINK-8286:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185050544
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Why is this exception being swallowed?


> Fix Flink-Yarn-Kerberos integration for FLIP-6
> --
>
> Key: FLINK-8286
> URL: https://issues.apache.org/jira/browse/FLINK-8286
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The current Flink-Yarn-Kerberos in Flip-6 is broken. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458784#comment-16458784
 ] 

ASF GitHub Bot commented on FLINK-8286:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185052704
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185052704
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
+   return null;
+   }
 
-   // configure local directory
-   if (configuration.contains(CoreOptions.TMP_DIRS)) {
-   LOG.info("Overriding YARN's temporary file 
directories with those " +
-   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
-   }
-   else {
-   LOG.info("Setting directories for temporary 
files to: {}", localDirs);
-   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
-   }
-
-   // tell akka to die in case of an error
-   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+   // configure local directory
+   if (configuration.contains(CoreOptions.TMP_DIRS)) {
+   LOG.info("Overriding YARN's temporary file directories 
with those " +
+   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
+   }
+   else {
+   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
+   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
+   }
 
-   String keytabPath = null;
-   if (remoteKeytabPath 

[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...

2018-04-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5896#discussion_r185050544
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
@@ -73,61 +105,68 @@ public static void main(String[] args) {
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-   run(args);
+   try {
+   SecurityUtils.getInstalledContext().runSecured(
+   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
+   } catch (Exception e) {
+   LOG.error("Exception occurred while launching Task 
Executor runner", e);
+   throw new RuntimeException(e);
+   }
}
 
/**
-* The instance entry point for the YARN task executor. Obtains user 
group information and calls
-* the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
-* privileged action.
+* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
 *
-* @param args The command line arguments.
+* @param envs environment variables.
 */
-   private static void run(String[] args) {
-   try {
-   LOG.debug("All environment variables: {}", ENV);
+   @VisibleForTesting
+   protected static Runner create(Map envs) {
+   LOG.debug("All environment variables: {}", envs);
 
-   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
-   LOG.info("Current working/local Directory: {}", 
localDirs);
+   final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+   final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+   LOG.info("Current working/local Directory: {}", localDirs);
 
-   final String currDir = ENV.get(Environment.PWD.key());
-   LOG.info("Current working Directory: {}", currDir);
+   final String currDir = envs.get(Environment.PWD.key());
+   LOG.info("Current working Directory: {}", currDir);
 
-   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
-   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
+   final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
 
-   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
-   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
+   final Configuration configuration;
+   try {
+   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
+   } catch (Throwable t) {
+   LOG.error(t.getMessage(), t);
--- End diff --

Why is this exception being swallowed?


---


[jira] [Created] (FLINK-9277) Reduce noisiness of SlotPool logging

2018-04-30 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9277:
---

 Summary: Reduce noisiness of SlotPool logging
 Key: FLINK-9277
 URL: https://issues.apache.org/jira/browse/FLINK-9277
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Stephan Ewen
Assignee: Till Rohrmann


The slot pool logs a vary large amount of stack traces with meaningless 
exceptions like {code}
org.apache.flink.util.FlinkException: Release multi task slot because all 
children have been released.
{code}

This makes log parsing very hard.

For an example, see this log: 
https://gist.githubusercontent.com/GJL/3b109db48734ff40103f47d04fc54bd3/raw/e3afc0ec3f452bad681e388016bcf799bba56f10/gistfile1.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458775#comment-16458775
 ] 

ASF GitHub Bot commented on FLINK-9269:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5934
  
Thanks for your comments, @StephanEwen , If I am not misunderstanding , we 
don't need to duplicate the serializer now, because we will have a dedicated 
optimization for it in the near future, I am `+1` for that. Then, what about 
the concurrency problem cause by the `stateTables`, it's an obvious bug that 
there could be multi thread access the `stateTab` concurrently, and one of them 
could modify the `stateTab`...But so far, no users have reported that problem 
yet, maybe that's because most of the user are using the `RocksDBKeyedBackend` 
online instead of `HeapKeyedStateBackend`, so I think this is not an urgent 
bug, but...it's still a bug, Is it should be fixed for 1.5?


> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> {code:java}
> @Nonnull
> @Override
> protected SnapshotResult performOperation() throws 
> Exception {
> // do something
>long[] keyGroupRangeOffsets = new 
> long[keyGroupRange.getNumberOfKeyGroups()];
>for (int keyGroupPos = 0; keyGroupPos < 
> keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
>   int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
>   keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
>   outView.writeInt(keyGroupId);
>   for (Map.Entry> kvState : 
> stateTables.entrySet()) {
> // do something
>   }
> }
> // do something
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

2018-04-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5934
  
Thanks for your comments, @StephanEwen , If I am not misunderstanding , we 
don't need to duplicate the serializer now, because we will have a dedicated 
optimization for it in the near future, I am `+1` for that. Then, what about 
the concurrency problem cause by the `stateTables`, it's an obvious bug that 
there could be multi thread access the `stateTab` concurrently, and one of them 
could modify the `stateTab`...But so far, no users have reported that problem 
yet, maybe that's because most of the user are using the `RocksDBKeyedBackend` 
online instead of `HeapKeyedStateBackend`, so I think this is not an urgent 
bug, but...it's still a bug, Is it should be fixed for 1.5?


---


[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6

2018-04-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458774#comment-16458774
 ] 

Aljoscha Krettek commented on FLINK-8286:
-

[~suez1224] Could you describe a bit what the problem was and how this fixes 
it? I'm also mostly interested in why {{YARNSessionFIFOSecuredITCase}} didn't 
fail, for example.

> Fix Flink-Yarn-Kerberos integration for FLIP-6
> --
>
> Key: FLINK-8286
> URL: https://issues.apache.org/jira/browse/FLINK-8286
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The current Flink-Yarn-Kerberos in Flip-6 is broken. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9276) Improve error message when TaskManager fails

2018-04-30 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9276:
---

 Summary: Improve error message when TaskManager fails
 Key: FLINK-9276
 URL: https://issues.apache.org/jira/browse/FLINK-9276
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Stephan Ewen


When a TaskManager fails, we frequently get a message

{code}
org.apache.flink.util.FlinkException: Releasing TaskManager 
container_1524853016208_0001_01_000102
{code}

This message is misleading in that it sounds like an intended operation, when 
it really is a failure of a container that the {{ResourceManager}} reports to 
the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458767#comment-16458767
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5931
  
Hi @GJL , is it possible that the reason is the same as in the previous PR 
for this ticket, that is even the container setup successfully and connect with 
ResourceManager successfully, but the TM was killed before connecting to 
JobManager successfully. In this case, even though there are enough TMs, 
JobManager won't fire any new request, and the ResourceManager doesn't know 
that the container it assigned to JobManager  has been killed either, so both 
JobManager & ResourceManager won't do anything but waiting for timeout... What 
do you think?


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-04-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5931
  
Hi @GJL , is it possible that the reason is the same as in the previous PR 
for this ticket, that is even the container setup successfully and connect with 
ResourceManager successfully, but the TM was killed before connecting to 
JobManager successfully. In this case, even though there are enough TMs, 
JobManager won't fire any new request, and the ResourceManager doesn't know 
that the container it assigned to JobManager  has been killed either, so both 
JobManager & ResourceManager won't do anything but waiting for timeout... What 
do you think?


---


[jira] [Updated] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-04-30 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9269:
--
Priority: Major  (was: Blocker)

> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> {code:java}
> @Nonnull
> @Override
> protected SnapshotResult performOperation() throws 
> Exception {
> // do something
>long[] keyGroupRangeOffsets = new 
> long[keyGroupRange.getNumberOfKeyGroups()];
>for (int keyGroupPos = 0; keyGroupPos < 
> keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
>   int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
>   keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
>   outView.writeInt(keyGroupId);
>   for (Map.Entry> kvState : 
> stateTables.entrySet()) {
> // do something
>   }
> }
> // do something
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-04-30 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458756#comment-16458756
 ] 

Sihua Zhou commented on FLINK-9269:
---

Hi [~aljoscha], I didn't find a symptom there yet. But I think I can trigger 
the concurrency problem very easily, because it's an obviously bug that there 
could be multi thread access the `stateTab` concurrency, and one of there can 
modify the `stateTab`...But so far, no users have reported the problem, maybe 
that's because most of the user are using the RocksDBKeyedBackend online 
instead of HeapKeyedStateBackend, now I'm going to remove the it from the 
BROCKER list.

> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code:java}
> @Nonnull
> @Override
> protected SnapshotResult performOperation() throws 
> Exception {
> // do something
>long[] keyGroupRangeOffsets = new 
> long[keyGroupRange.getNumberOfKeyGroups()];
>for (int keyGroupPos = 0; keyGroupPos < 
> keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
>   int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
>   keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
>   outView.writeInt(keyGroupId);
>   for (Map.Entry> kvState : 
> stateTables.entrySet()) {
> // do something
>   }
> }
> // do something
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458752#comment-16458752
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5944

[FLINK-8900] [yarn] Set correct application status when job is finished

## What is the purpose of the change

When finite Flink applications (batch jobs) are sent to YARN in the 
detached mode, the final status is currently always the same, because the job's 
result is not passed to the logic that initiates the application shutdown.

This PR forwards the final job status via a future that is used to register 
the shutdown handlers.

## Brief change log

  - Introduce the `JobTerminationFuture` in the `MiniDispatcher`
  - 

## Verifying this change

```
bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048  
./examples/streaming/WordCount.jar
```

  - Run the batch job as described above on YARN to succeed, check that the 
final application status is successful.

  - Run the batch job with a parameter to a non existing input file on 
YARN, check that the final application status is failed.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink yarn_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5944


commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7
Author: Stephan Ewen 
Date:   2018-04-30T07:55:50Z

[hotfix] [tests] Update log4j-test.properties

Brings the logging definition in sync with other projects.
Updates the classname for the suppressed logger in Netty to account for the 
new
shading model introduced in Flink 1.4.

commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23
Author: Stephan Ewen 
Date:   2018-04-27T16:57:27Z

[FLINK-8900] [yarn] Set correct application status when job is finished




> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the 

[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-04-30 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5944

[FLINK-8900] [yarn] Set correct application status when job is finished

## What is the purpose of the change

When finite Flink applications (batch jobs) are sent to YARN in the 
detached mode, the final status is currently always the same, because the job's 
result is not passed to the logic that initiates the application shutdown.

This PR forwards the final job status via a future that is used to register 
the shutdown handlers.

## Brief change log

  - Introduce the `JobTerminationFuture` in the `MiniDispatcher`
  - 

## Verifying this change

```
bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048  
./examples/streaming/WordCount.jar
```

  - Run the batch job as described above on YARN to succeed, check that the 
final application status is successful.

  - Run the batch job with a parameter to a non existing input file on 
YARN, check that the final application status is failed.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink yarn_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5944


commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7
Author: Stephan Ewen 
Date:   2018-04-30T07:55:50Z

[hotfix] [tests] Update log4j-test.properties

Brings the logging definition in sync with other projects.
Updates the classname for the suppressed logger in Netty to account for the 
new
shading model introduced in Flink 1.4.

commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23
Author: Stephan Ewen 
Date:   2018-04-27T16:57:27Z

[FLINK-8900] [yarn] Set correct application status when job is finished




---


[jira] [Commented] (FLINK-9174) The type of state created in ProccessWindowFunction.proccess() is inconsistency

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458736#comment-16458736
 ] 

ASF GitHub Bot commented on FLINK-9174:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5847
  
@aljoscha Thanks for your review, I have addressed your comments.


> The type of state created in ProccessWindowFunction.proccess() is 
> inconsistency
> ---
>
> Key: FLINK-9174
> URL: https://issues.apache.org/jira/browse/FLINK-9174
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> The type of state created from windowState and globalState in 
> {{ProcessWindowFunction.process()}} is inconsistency. For detail,
> {code}
> context.windowState().getListState(); // return type is HeapListState or 
> RocksDBListState
> context.globalState().getListState(); // return type is UserFacingListState
> {code}
> This cause the problem in the following code,
> {code}
> Iterable iterableState = listState.get();
>  if (terableState.iterator().hasNext()) {
>for (T value : iterableState) {
>  value.setRetracting(true);
>  collector.collect(value);
>}
>state.clear();
> }
> {code}
> If the {{listState}} is created from {{context.globalState()}} is fine, but 
> when it created from {{context.windowState()}} this will cause NPE. I met 
> this in 1.3.2 but I found it also affect 1.5.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5847: [FLINK-9174][datastream]Fix the type of state created in ...

2018-04-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5847
  
@aljoscha Thanks for your review, I have addressed your comments.


---


[jira] [Commented] (FLINK-9202) AvroSerializer should not be serializing the target Avro type class

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458728#comment-16458728
 ] 

Stephan Ewen commented on FLINK-9202:
-

I think the problem is different: The config snapshots should not be 
serializing serializers. The serializer itself is perfectly correct to hold the 
avro type class.

I would either rename this issue or close it and create the proper issue as 
part of the state evolution umbrella issues.

> AvroSerializer should not be serializing the target Avro type class
> ---
>
> Key: FLINK-9202
> URL: https://issues.apache.org/jira/browse/FLINK-9202
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> The {{AvroSerializer}} contains this field which is written when the 
> serializer is written into savepoints:
> [https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L78]
> This causes Avro schema evolution to not work properly, because Avro 
> generated classes have non-fixed serialVersionUIDs. Once a new Avro class is 
> generated with a new schema, that class can not be loaded on restore due to 
> incompatible UIDs, and thus the serializer can not be successfully 
> deserialized.
> A possible solution would be to only write the classname, and dynamically 
> load the class into a transient field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9234) Commons Logging is missing from shaded Flink Table library

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458725#comment-16458725
 ] 

ASF GitHub Bot commented on FLINK-9234:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5897
  
Can we actually get rid of `commons-configuration` in the table API?

All the commons packages with their weird long tail of not properly 
declared dependencies have become a bit of an anti-pattern to me over time...


> Commons Logging is missing from shaded Flink Table library
> --
>
> Key: FLINK-9234
> URL: https://issues.apache.org/jira/browse/FLINK-9234
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.2
> Environment: jdk1.8.0_172
> flink 1.4.2
> Mac High Sierra
>Reporter: Eron Wright 
>Assignee: Timo Walther
>Priority: Blocker
> Attachments: repro.scala
>
>
> The flink-table shaded library seems to be missing some classes from 
> {{org.apache.commons.logging}} that are required by 
> {{org.apache.commons.configuration}}.  Ran into the problem while using the 
> external catalog support, on Flink 1.4.2.
> See attached a repro, which produces:
> {code}
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/table/shaded/org/apache/commons/logging/Log
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55)
>   at 
> org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala)
>   at 
> org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78)
>   at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82)
>   at 
> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256)
>   at 
> org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561)
>   at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497)
>   at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485)
>   at Repro$.main(repro.scala:17)
>   at Repro.main(repro.scala)
> {code}
> Dependencies:
> {code}
> compile 'org.slf4j:slf4j-api:1.7.25'
> compile 'org.slf4j:slf4j-log4j12:1.7.25'
> runtime 'log4j:log4j:1.2.17'
> compile 'org.apache.flink:flink-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2'
> compile 'org.apache.flink:flink-clients_2.11:1.4.2'
> compile 'org.apache.flink:flink-table_2.11:1.4.2'
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5897
  
Can we actually get rid of `commons-configuration` in the table API?

All the commons packages with their weird long tail of not properly 
declared dependencies have become a bit of an anti-pattern to me over time...


---


[jira] [Commented] (FLINK-9250) JoinTaskExternalITCase deadlocks on travis

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458723#comment-16458723
 ] 

Stephan Ewen commented on FLINK-9250:
-

I think this is not actually deadlocked. The test is not distributed and the 
stack trace indicates the thread is actually running at the point when the test 
fails.

The test timed out. It must have either incredibly slow for some reason, or the 
clock jumped.

In any case, I would close this as a non issue at this point. Please close if 
you agree.

> JoinTaskExternalITCase deadlocks on travis
> --
>
> Key: FLINK-9250
> URL: https://issues.apache.org/jira/browse/FLINK-9250
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/368995097



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5924: [hotfix][README.md] Update building prerequisites

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5924
  
Thanks, merging this...


---


[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458699#comment-16458699
 ] 

Stephan Ewen commented on FLINK-9268:
-

For the short run, would be great to have a better exception message here.

For the long run, I think we need to contribute a {{ByteBuffer}} style 
interface to RocksDB.

> RockDB errors from WindowOperator
> -
>
> Key: FLINK-9268
> URL: https://issues.apache.org/jira/browse/FLINK-9268
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and 
> uses processing time. The job fails with the error given below after running 
> for few hours. The only way to recover from this error is to cancel the job 
> and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=90
> checkpoint.timeout=30
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458692#comment-16458692
 ] 

ASF GitHub Bot commented on FLINK-9269:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5934
  
Concerning serializer snapshots:
  - We need to move away from Java Serializing the serializers into the 
config snapshots anyways and should do that in the near future.
  - I think the config snapshot should be created once when the state is 
created, encoded as `byte[]`, and then we only write the bytes. That safes us 
from repeated work on every checkpoint and would also prevent concurrent access 
to the serializer for creating the snapshot.


> Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
> -
>
> Key: FLINK-9269
> URL: https://issues.apache.org/jira/browse/FLINK-9269
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code:java}
> @Nonnull
> @Override
> protected SnapshotResult performOperation() throws 
> Exception {
> // do something
>long[] keyGroupRangeOffsets = new 
> long[keyGroupRange.getNumberOfKeyGroups()];
>for (int keyGroupPos = 0; keyGroupPos < 
> keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
>   int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
>   keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
>   outView.writeInt(keyGroupId);
>   for (Map.Entry> kvState : 
> stateTables.entrySet()) {
> // do something
>   }
> }
> // do something
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5934
  
Concerning serializer snapshots:
  - We need to move away from Java Serializing the serializers into the 
config snapshots anyways and should do that in the near future.
  - I think the config snapshot should be created once when the state is 
created, encoded as `byte[]`, and then we only write the bytes. That safes us 
from repeated work on every checkpoint and would also prevent concurrent access 
to the serializer for creating the snapshot.


---


[jira] [Commented] (FLINK-9270) Upgrade RocksDB to 5.11.3, and resolve concurrent test invocation problem of @RetryOnFailure

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458687#comment-16458687
 ] 

Stephan Ewen commented on FLINK-9270:
-

Do we know why the merge operator regressed again? Was there a problem with the 
current implementation?

> Upgrade RocksDB to 5.11.3, and resolve concurrent test invocation problem of 
> @RetryOnFailure
> 
>
> Key: FLINK-9270
> URL: https://issues.apache.org/jira/browse/FLINK-9270
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Upgrade RocksDB to 5.11.3 to take latest bug fixes
> Besides, I found that unit tests annotated with {{@RetryOnFailure}} will be 
> run concurrently if there's only {{try}} clause without a {{catch}} 
> following. For example, sometimes, 
> {{RocksDBPerformanceTest.testRocksDbMergePerformance()}} will actually be 
> running in 3 concurrent invocations, and multiple concurrent write to RocksDB 
> result in errors. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9271) flink-1.4.2-bin-scala_2.11.tgz is not in gzip format

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458684#comment-16458684
 ] 

Stephan Ewen commented on FLINK-9271:
-

I just tried this and get 
{code}
file flink-1.4.2-bin-scala_2.11.tgz.1
flink-1.4.2-bin-scala_2.11.tgz.1: gzip compressed data
{code}

How did you get that particular file?

> flink-1.4.2-bin-scala_2.11.tgz is not in gzip format
> 
>
> Key: FLINK-9271
> URL: https://issues.apache.org/jira/browse/FLINK-9271
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.2
>Reporter: Martin Grigorov
>Priority: Minor
>
> Hi,
> I've just downloaded "Flink Without Hadoop" from 
> [http://flink.apache.org/downloads.html.]
> The name of the downloaded file is "flink-1.4.2-bin-scala_2.11.tgz" but 
> trying to unpack it fails with:
> {code}
> tar zxvf flink-1.4.2-bin-scala_2.11.tgz 
> gzip: stdin: not in gzip format
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
> {code}
> {code}
> file flink-1.4.2-bin-scala_2.11.tgz   
>   
>   
> flink-1.4.2-bin-scala_2.11.tgz: POSIX tar archive (GNU)
> {code}
> I'd suggest to rename the artefact to flink-1.4.2-bin-scala_2.11.*tar* to 
> make it more clear what is inside and how to unpack it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9273) Class cast exception

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458672#comment-16458672
 ] 

Stephan Ewen commented on FLINK-9273:
-

Can you share the program that caused this exception?

At a first glance, this looks like there is something wrong in your 
application, like using a raw type or doing a wrong unchecked cast from a 
{{StreamSource}} to a {{DataStream}} or so.

> Class cast exception
> 
>
> Key: FLINK-9273
> URL: https://issues.apache.org/jira/browse/FLINK-9273
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming, Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> Exception stack is as follows:
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
> at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:396)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:307)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> ... 1 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9214) YarnClient should be stopped in YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458669#comment-16458669
 ] 

ASF GitHub Bot commented on FLINK-9214:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5892
  
Thanks, looks good, merging this...


> YarnClient should be stopped in 
> YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal
> 
>
> Key: FLINK-9214
> URL: https://issues.apache.org/jira/browse/FLINK-9214
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal 
> creates YarnClient without stopping it at the end of the test.
> YarnClient yc should be stopped before returning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5892: [FLINK-9214] YarnClient should be stopped in YARNSessionC...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5892
  
Thanks, looks good, merging this...


---


[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5928
  
The configuration (`config.md`)should be generated from the config options 
by now, so not be manually edited.

(@zentol could you chime in here?)


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r185021293
  
--- Diff: docs/dev/stream/state/checkpointing.md ---
@@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
-  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
 
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
+- `state.checkpoints.dir`: The target directory for storing checkpoints 
data files and meta data of [externalized checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported 
filesystem. Note: State backend must be accessible from the JobManager, use 
`file://` only for local setups.
--- End diff --

Yes, `file:///` is what you use for many NAS style storage systems, so it 
is not local-only. Let's change this to say that the storage path must be 
accessible from all participating processes/nodes, i.e., all TaskManagers and 
JobManagers


---


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458620#comment-16458620
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
@twalthr Hi, thanks for your review. I have updated the pr according to 
your suggestions. Changes mainly include:
- Remove changes about UpsertSink
- Refactor test case name and add more test to cover code path
- Add more method comments
- Add another base class `NonWindowOuterJoinWithNonEquiPredicates` and move 
corresponding variables and functions into it.
- Split `CRowWrappingMultiOutputCollector` into 
`CRowWrappingMultiOutputCollector` and `LazyOutputCollector`. 

Best, Hequn.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9261) Regression - Flink CLI and Web UI not working when SSL is enabled

2018-04-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458622#comment-16458622
 ] 

Stephan Ewen commented on FLINK-9261:
-

Chesnay's suggestion may fix this, by enabling / disabling REST security 
separate from on-the-wire encryption.

For a clean solution, there is something more needed, because clients now 
communicate via REST with the server, and I think users would very much like 
the client to be authenticated as well when submitting something to a Flink 
server.

The configuration posted above is affecting the RPC and data place meaning.

  - Previously (Flink 1.4) it was TM/TM and JM/TM and Client/JM communication.
  - Now (Flink 1.5+), this affects only TM/TM and JM/TM.


> Regression - Flink CLI and Web UI not working when SSL is enabled
> -
>
> Key: FLINK-9261
> URL: https://issues.apache.org/jira/browse/FLINK-9261
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Network, Web Client
>Affects Versions: 1.5.0
>Reporter: Edward Rojas
>Priority: Blocker
>  Labels: regression
> Fix For: 1.5.0
>
>
> When *security.ssl.enabled* config is set to true, Web UI is no longer 
> reachable; there is no logs on jobmanager. 
>  
> When setting *web.ssl.enabled* to false (keeping security.ssl.enabled to 
> true), the dashboard is not reachable and there is the following exception on 
> jobmanager: 
> {code:java}
> WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - 
> Unhandled exception
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: 
> not an SSL/TLS record: 
> 474554202f20485454502f312e310d0a486f73743a206c6f63616c686f73743a383038310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a557067726164652d496e7365637572652d52657175657374733a20310d0a557365722d4167656e743a204d6f7a696c6c612f352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31335f3329204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f36352e302e32352e313831205361666172692f3533372e33360d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c696d6167652f776562702c696d6167652f61706e672c2a2f2a3b713d302e380d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2c656e2d47423b713d302e392c65732d3431393b713d302e382c65733b713d302e372c66722d46523b713d302e362c66723b713d302e350d0a436f6f6b69653a20496465612d39326365626136363d39396464633637632d613838382d346439332d396166612d3737396631373636326264320d0a49662d4d6f6469666965642d53696e63653a205468752c2032362041707220323031382031313a30313a313520474d540d0a0d0a
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:940)
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315)
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Also when trying to use the Flink CLI, it get stuck on "Waiting for 
> response..." and there is no error messages on jobmanager. None of the 
> commands works, list, run etc.
>  
> Taskmanagers are able to registrate to Jobmanager, so the SSL configuration 
> is good.
>  
> SSL configuration:
> security.ssl.enabled: true
> security.ssl.keystore: /path/to/keystore
> 

[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/5327
  
@twalthr Hi, thanks for your review. I have updated the pr according to 
your suggestions. Changes mainly include:
- Remove changes about UpsertSink
- Refactor test case name and add more test to cover code path
- Add more method comments
- Add another base class `NonWindowOuterJoinWithNonEquiPredicates` and move 
corresponding variables and functions into it.
- Split `CRowWrappingMultiOutputCollector` into 
`CRowWrappingMultiOutputCollector` and `LazyOutputCollector`. 

Best, Hequn.


---


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458614#comment-16458614
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995939
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
+
+val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995939
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
 expected.add("D,R-8,null")
 StreamITCase.compareWithList(expected)
   }
+
+  /** test non-window inner join **/
+  @Test
+  def testNonWindowInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+data1.+=((3, 8L, "Hi9"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+data2.+=((3, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+  .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.b > t2.b
+|""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi2",
+  "1,HiHi,Hi3",
+  "1,HiHi,Hi6",
+  "1,HiHi,Hi8",
+  "2,HeHe,Hi5",
+  "null,HeHe,Hi9")
+
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hi,Hallo")
+val results = result.toRetractStream[Row]
+results.addSink(new StreamITCase.RetractingSink)
+env.execute()
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+env.setStateBackend(getStateBackend)
+
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
+
+val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+val result = tEnv.sqlQuery(sqlQuery)
+
+val expected = Seq("Hello world, how are 

[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458610#comment-16458610
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val 

[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458613#comment-16458613
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

I planed to add right join in FLINK-8429. It's ok to add right join in this 
pr if you prefer.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458609#comment-16458609
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -230,8 +230,12 @@ abstract class StreamTableEnvironment(
 tableKeys match {
   case Some(keys) => upsertSink.setKeyFields(keys)
   case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
-  case None if !isAppendOnlyTable => throw new TableException(
-"UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
+  case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() 
== null =>
--- End diff --

OK.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

I planed to add right join in FLINK-8429. It's ok to add right join in this 
pr if you prefer.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458612#comment-16458612
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995668
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
 ---
@@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase {
   )
 util.verifyTableTrait(resultTable, expected)
   }
-}
 
+  @Test
+  def testInnerJoinWithoutAgg(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, Int)]('bb, 'c)
+
+val resultTable = lTable
+  .join(rTable)
+  .where('b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, Acc"
+),
+"false, Acc"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, AccRetract"
+),
+"false, AccRetract"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val countDistinct = new CountDistinct
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+  .groupBy('a)
+  .select('a, countDistinct('c))
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  binaryNode(
+"DataStreamJoin",
+"DataStreamScan(true, Acc)",
--- End diff --

`testJoin()` has covered this case.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995668
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
 ---
@@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase {
   )
 util.verifyTableTrait(resultTable, expected)
   }
-}
 
+  @Test
+  def testInnerJoinWithoutAgg(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, Int)]('bb, 'c)
+
+val resultTable = lTable
+  .join(rTable)
+  .where('b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, Acc"
+),
+"false, Acc"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+binaryNode(
+  "DataStreamJoin",
+  "DataStreamScan(true, Acc)",
+  "DataStreamScan(true, Acc)",
+  "false, AccRetract"
+),
+"false, AccRetract"
+  )
+util.verifyTableTrait(resultTable, expected)
+  }
+
+  @Test
+  def testAggFollowedWithLeftJoin(): Unit = {
+val util = streamTestForRetractionUtil()
+val lTable = util.addTable[(Int, Int)]('a, 'b)
+val rTable = util.addTable[(Int, String)]('bb, 'c)
+
+val countDistinct = new CountDistinct
+val resultTable = lTable
+  .leftOuterJoin(rTable, 'b === 'bb)
+  .select('a, 'b, 'c)
+  .groupBy('a)
+  .select('a, countDistinct('c))
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  binaryNode(
+"DataStreamJoin",
+"DataStreamScan(true, Acc)",
--- End diff --

`testJoin()` has covered this case.


---


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -230,8 +230,12 @@ abstract class StreamTableEnvironment(
 tableKeys match {
   case Some(keys) => upsertSink.setKeyFields(keys)
   case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
-  case None if !isAppendOnlyTable => throw new TableException(
-"UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
+  case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() 
== null =>
--- End diff --

OK.


---


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458607#comment-16458607
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995438
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val 

[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458606#comment-16458606
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995228
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995438
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-04-30 Thread hequn8128
Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184995228
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val otherSideRow = otherSideEntry.getKey
+  val otherSideCntAndExpiredTime = otherSideEntry.getValue
+  // join
+  cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
+  callJoinFunction(inputRow, inputRowFromLeft, 

[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458603#comment-16458603
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r184994673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Connect data for left stream and right stream. Base class for stream 
non-window outer Join.
+  *
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param resultType  the output type of join
+  * @param genJoinFuncName the function code of other non-equi condition
+  * @param genJoinFuncCode the function name of other non-equi condition
+  * @param isLeftJoin  the type of join, whether it is the type of 
left join
+  * @param queryConfig the configuration for the query to generate
+  */
+abstract class NonWindowOuterJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+resultType: TypeInformation[CRow],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+isLeftJoin: Boolean,
+queryConfig: StreamQueryConfig)
+  extends NonWindowJoin(
+leftType,
+rightType,
+resultType,
+genJoinFuncName,
+genJoinFuncCode,
+queryConfig) {
+
+  // result row, all fields from right will be null. Used for output when 
there is no matched rows.
+  protected var leftResultRow: Row = _
+  // result row, all fields from left will be null. Used for output when 
there is no matched rows.
+  protected var rightResultRow: Row = _
+  // how many matched rows from the right table for each left row. Index 0 
is used for left
+  // stream, index 1 is used for right stream.
+  protected var joinCntState: Array[MapState[Row, Long]] = _
+
+  override def open(parameters: Configuration): Unit = {
+super.open(parameters)
+
+leftResultRow = new Row(resultType.getArity)
+rightResultRow = new Row(resultType.getArity)
+
+joinCntState = new Array[MapState[Row, Long]](2)
+val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "leftJoinCnt", leftType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(0) = 
getRuntimeContext.getMapState(leftJoinCntStateDescriptor)
+val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long](
+  "rightJoinCnt", rightType, 
Types.LONG.asInstanceOf[TypeInformation[Long]])
+joinCntState(1) = 
getRuntimeContext.getMapState(rightJoinCntStateDescriptor)
+
+LOG.debug(s"Instantiating NonWindowOuterJoin")
+  }
+
+  /**
+* Join current row with other side rows. Preserve current row if there 
are no matched rows
+* from other side.
+*/
+  def preservedJoin(
+  inputRow: Row,
+  inputRowFromLeft: Boolean,
+  otherSideState: MapState[Row, JTuple2[Long, Long]],
+  curProcessTime: Long): Long = {
+
+val otherSideIterator = otherSideState.iterator()
+while (otherSideIterator.hasNext) {
+  val otherSideEntry = otherSideIterator.next()
+  val 

  1   2   >