[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459439#comment-16459439 ] Gary Yao commented on FLINK-9196: - [~yuqi] Your patch would not work if the application is submitted in detached mode. When working on tickets, please assign the tickets to yourself in future. > YARN: Flink binaries are not deleted from HDFS after cluster shutdown > - > > Key: FLINK-9196 > URL: https://issues.apache.org/jira/browse/FLINK-9196 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: 0001-xxx.patch > > > When deploying on YARN in flip6 mode, the Flink binaries are not deleted from > HDFS after the cluster shuts down. > *Steps to reproduce* > # Submit job in YARN job mode, non-detached: > {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat} > # Check contents of {{/user/hadoop/.flink/}} on HDFS after > job is finished: > {noformat} > [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls > /user/hadoop/.flink/application_1523966184826_0016 > Found 6 items > -rw-r--r-- 1 hadoop hadoop583 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml > -rw-r--r-- 1 hadoop hadoop332 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp > -rw-r--r-- 1 hadoop hadoop 89779342 2018-04-02 17:08 > /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar > drwxrwxrwx - hadoop hadoop 0 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/lib > -rw-r--r-- 1 hadoop hadoop 1939 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/log4j.properties > -rw-r--r-- 1 hadoop hadoop 2331 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/logback.xml > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459436#comment-16459436 ] ASF GitHub Bot commented on FLINK-8900: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185163799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { --- End diff -- `throwable` isn't used. If `jobResultFuture` cannot be completed exceptionally, `thenAccept` should be used. > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > -
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459437#comment-16459437 ] ASF GitHub Bot commented on FLINK-8900: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185164034 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { + ApplicationStatus status = result.getSerializedThrowable().isPresent() ? + ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; + jobTerminationFuture.complete(status); --- End diff -- I think the functional way would be: ``` jobTerminationFuture.complete(result.getSerializedThrowable() .map(serializedThrowable -> ApplicationStatus.FAILED) .orElse(ApplicationStatus.SUCCEEDED)); ``` > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15:
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459435#comment-16459435 ] ASF GitHub Bot commented on FLINK-8900: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185163871 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java --- @@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc } } + @Override + protected void registerShutdownActions(CompletableFuture terminationFuture) { + terminationFuture.whenComplete((status, throwable) -> --- End diff -- `throwable` isn't used. If `terminationFuture` cannot be completed exceptionally, `thenAccept` should be used. > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:46511 > 2018-03-08 16:48:39,975 INFO > org.apache.flink.runtime.blob.TransientBlobCache
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185164034 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { + ApplicationStatus status = result.getSerializedThrowable().isPresent() ? + ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; + jobTerminationFuture.complete(status); --- End diff -- I think the functional way would be: ``` jobTerminationFuture.complete(result.getSerializedThrowable() .map(serializedThrowable -> ApplicationStatus.FAILED) .orElse(ApplicationStatus.SUCCEEDED)); ``` ---
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185163871 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java --- @@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc } } + @Override + protected void registerShutdownActions(CompletableFuture terminationFuture) { + terminationFuture.whenComplete((status, throwable) -> --- End diff -- `throwable` isn't used. If `terminationFuture` cannot be completed exceptionally, `thenAccept` should be used. ---
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185163799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { --- End diff -- `throwable` isn't used. If `jobResultFuture` cannot be completed exceptionally, `thenAccept` should be used. ---
[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
[ https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7775: -- Description: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method of PermanentBlobCache is not used. We should remove it. was: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method of PermanentBlobCache is not used. We should remove it. > Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs > --- > > Key: FLINK-7775 > URL: https://issues.apache.org/jira/browse/FLINK-7775 > Project: Flink > Issue Type: Task > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public int getNumberOfCachedJobs() { > return jobRefCounters.size(); > } > {code} > The method of PermanentBlobCache is not used. > We should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9091: -- Description: Got the following when building flink-json module: {code} [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence failed with message: Failed while enforcing releasability. See above detailed error message. ... [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce (dependency-convergence) on project flink-json: Some Enforcer rules have failed. Look above for specific messages explaining why the rule failed. -> [Help 1] {code} was: Got the following when building flink-json module: {code} [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence failed with message: Failed while enforcing releasability. See above detailed error message. ... [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce (dependency-convergence) on project flink-json: Some Enforcer rules have failed. Look above for specific messages explaining why the rule failed. -> [Help 1] {code} > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459237#comment-16459237 ] ASF GitHub Bot commented on FLINK-8286: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185135031 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185134529 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Good point. Added exceptions to method signature and let caller handle it. ---
[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459238#comment-16459238 ] ASF GitHub Bot commented on FLINK-8286: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185134529 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Good point. Added exceptions to method signature and let caller handle it. > Fix Flink-Yarn-Kerberos integration for FLIP-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current Flink-Yarn-Kerberos in Flip-6 is broken. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185135031 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath
[jira] [Commented] (FLINK-9256) NPE in SingleInputGate#updateInputChannel() for non-credit based flow control
[ https://issues.apache.org/jira/browse/FLINK-9256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459079#comment-16459079 ] ASF GitHub Bot commented on FLINK-9256: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5914 > NPE in SingleInputGate#updateInputChannel() for non-credit based flow control > - > > Key: FLINK-9256 > URL: https://issues.apache.org/jira/browse/FLINK-9256 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {{SingleInputGate#updateInputChannel()}} fails to update remote partitions > without credit based flow control due to a {{NullPointerException}} from > {{networkBufferPool == null}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9214) YarnClient should be stopped in YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal
[ https://issues.apache.org/jira/browse/FLINK-9214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459076#comment-16459076 ] ASF GitHub Bot commented on FLINK-9214: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5892 > YarnClient should be stopped in > YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal > > > Key: FLINK-9214 > URL: https://issues.apache.org/jira/browse/FLINK-9214 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal > creates YarnClient without stopping it at the end of the test. > YarnClient yc should be stopped before returning. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9274) Add thread name to Kafka Partition Discovery
[ https://issues.apache.org/jira/browse/FLINK-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459080#comment-16459080 ] ASF GitHub Bot commented on FLINK-9274: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5942 > Add thread name to Kafka Partition Discovery > > > Key: FLINK-9274 > URL: https://issues.apache.org/jira/browse/FLINK-9274 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > For debugging, threads should have names to filter on and get a quick > overview. The Kafka partition discovery thread(s) currently don't have any > name assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459077#comment-16459077 ] ASF GitHub Bot commented on FLINK-9196: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5938 > YARN: Flink binaries are not deleted from HDFS after cluster shutdown > - > > Key: FLINK-9196 > URL: https://issues.apache.org/jira/browse/FLINK-9196 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: 0001-xxx.patch > > > When deploying on YARN in flip6 mode, the Flink binaries are not deleted from > HDFS after the cluster shuts down. > *Steps to reproduce* > # Submit job in YARN job mode, non-detached: > {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat} > # Check contents of {{/user/hadoop/.flink/}} on HDFS after > job is finished: > {noformat} > [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls > /user/hadoop/.flink/application_1523966184826_0016 > Found 6 items > -rw-r--r-- 1 hadoop hadoop583 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml > -rw-r--r-- 1 hadoop hadoop332 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp > -rw-r--r-- 1 hadoop hadoop 89779342 2018-04-02 17:08 > /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar > drwxrwxrwx - hadoop hadoop 0 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/lib > -rw-r--r-- 1 hadoop hadoop 1939 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/log4j.properties > -rw-r--r-- 1 hadoop hadoop 2331 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/logback.xml > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9275) Set more distinctive output flusher thread names
[ https://issues.apache.org/jira/browse/FLINK-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459078#comment-16459078 ] ASF GitHub Bot commented on FLINK-9275: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5943 > Set more distinctive output flusher thread names > > > Key: FLINK-9275 > URL: https://issues.apache.org/jira/browse/FLINK-9275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > All output flusher threads are named "OutputFlusher" while at the only place > the {{StreamWriter}} is initialized, we already have the task name at hand. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5914: [FLINK-9256][network] fix NPE in SingleInputGate#u...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5914 ---
[GitHub] flink pull request #5916: [hotfix][tests] remove redundant rebalance in Succ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5916 ---
[GitHub] flink pull request #5938: [FLINK-9196][flip6, yarn] Cleanup application file...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5938 ---
[GitHub] flink pull request #5892: [FLINK-9214] YarnClient should be stopped in YARNS...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5892 ---
[GitHub] flink pull request #5942: [FLINK-9274][kafka] add thread name for partition ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5942 ---
[GitHub] flink pull request #5924: [hotfix][README.md] Update building prerequisites
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5924 ---
[GitHub] flink pull request #5943: [FLINK-9275][streaming] add taskName to the output...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5943 ---
[jira] [Closed] (FLINK-9279) PythonPlanBinderTest flakey
[ https://issues.apache.org/jira/browse/FLINK-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9279. --- Resolution: Duplicate > PythonPlanBinderTest flakey > --- > > Key: FLINK-9279 > URL: https://issues.apache.org/jira/browse/FLINK-9279 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Priority: Critical > > The test fails while trying to create the parent directory {{/tmp/flink}}. > That happens if a file with that name already exists. > The Python Plan binder apparently used a fix name for the temp directory, but > should use a statistically unique random name instead. > Full test run log: https://api.travis-ci.org/v3/job/373120733/log.txt > Relevant Stack Trace > {code} > Job execution failed. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:898) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Mkdirs failed to create /tmp/flink > at > org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121) > at > org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) > at > org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:161) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 59.839 sec > <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest > testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest) > Time elapsed: 14.912 sec <<< FAILURE! > java.lang.AssertionError: Error while calling the test program: Job execution > failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:161) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459004#comment-16459004 ] ASF GitHub Bot commented on FLINK-8900: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5944 I will try it out. > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:46511 > 2018-03-08 16:48:39,975 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting down > BLOB cache > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5944 I will try it out. ---
[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458986#comment-16458986 ] ASF GitHub Bot commented on FLINK-9222: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5900 I'm pretty sure I got all this covered by using a separate "configuration": `myShadowJar` - not the standard dependency configuration*, but does the trick: Everything added to `myShadowJar` inside the `dependencies` gets added to the shaded jar (here: shadowJar), except for the explicit excludes which work with transitive dependencies as well (defined in the lines starting with `myShadowJar.exclude group:` where I included the same things as in the maven shade configuration of the quickstart). All user-code dependencies should be put into `myShadowJar` - maybe I should make this even more explicit in the gradle build file. - nothing is relocated - there's stuff packed into the jar and other stuff that isn't, that's it :) (should be the same - I did compare the jar with the one from maven with and without the kafka connector dependency as a test) - Flink core dependencies are excluded from the uber jar by not putting them into `myShadowJar` - with the trick of using `myShadowJar`, not only is IntelliJ able to run the job and the tests, it also runs from commandline * Unfortunately, I could not use the `shadow`/`compileOnly` dependency configurations which are standard for this in gradle because then the program would not run in IntelliJ or via `gradle run`. It would expect the environment to provide the dependencies which it does not there. Alternatives/fixes for this broke the transitive dependency exclusion which is, however, scheduled for some future version of the gradle shadow plugin. There a lot of enhancement requests in this regard, e.g. https://github.com/johnrengelman/shadow/issues/159 > Add a Gradle Quickstart > --- > > Key: FLINK-9222 > URL: https://issues.apache.org/jira/browse/FLINK-9222 > Project: Flink > Issue Type: Improvement > Components: Project Website, Quickstarts >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but there is none for Gradle > and Gradle users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5900: [FLINK-9222][docs] add documentation for setting up Gradl...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5900 I'm pretty sure I got all this covered by using a separate "configuration": `myShadowJar` - not the standard dependency configuration*, but does the trick: Everything added to `myShadowJar` inside the `dependencies` gets added to the shaded jar (here: shadowJar), except for the explicit excludes which work with transitive dependencies as well (defined in the lines starting with `myShadowJar.exclude group:` where I included the same things as in the maven shade configuration of the quickstart). All user-code dependencies should be put into `myShadowJar` - maybe I should make this even more explicit in the gradle build file. - nothing is relocated - there's stuff packed into the jar and other stuff that isn't, that's it :) (should be the same - I did compare the jar with the one from maven with and without the kafka connector dependency as a test) - Flink core dependencies are excluded from the uber jar by not putting them into `myShadowJar` - with the trick of using `myShadowJar`, not only is IntelliJ able to run the job and the tests, it also runs from commandline * Unfortunately, I could not use the `shadow`/`compileOnly` dependency configurations which are standard for this in gradle because then the program would not run in IntelliJ or via `gradle run`. It would expect the environment to provide the dependencies which it does not there. Alternatives/fixes for this broke the transitive dependency exclusion which is, however, scheduled for some future version of the gradle shadow plugin. There a lot of enhancement requests in this regard, e.g. https://github.com/johnrengelman/shadow/issues/159 ---
[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart
[ https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458960#comment-16458960 ] ASF GitHub Bot commented on FLINK-9222: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5900 Nice addition! A few things I would like to double check on the quickstart configuration (I am not fluent enough Gradle): - We do not need to hide/shade any dependencies in the user code. In Maven, we use the shade plugin, but only to build an uber jar, not to actually relocate dependencies. Is that the same in the Gradle quickstart? - The Flink core dependencies need to be in a scope equivalent to "provided", so they do not end up in the uber jar. Can we do something similar in Gradle? This has been a frequent source of unnecessarily bloated application jars. - The Maven quickstart template uses a trick to make sure that the provided dependencies are still in the classpath when we run the program in the IDE: A profile that activates in IDEA (by a property variable) and alters the scope from *provided* to *compile*. Not sure if that is strictly necessary, but may be helpful. > Add a Gradle Quickstart > --- > > Key: FLINK-9222 > URL: https://issues.apache.org/jira/browse/FLINK-9222 > Project: Flink > Issue Type: Improvement > Components: Project Website, Quickstarts >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > Having a proper project template helps a lot in getting dependencies right. > For example, setting the core dependencies to "provided", the connector / > library dependencies to "compile", etc. > The Maven quickstarts are in good shape by now, but there is none for Gradle > and Gradle users to get this wrong quite often. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5900: [FLINK-9222][docs] add documentation for setting up Gradl...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5900 Nice addition! A few things I would like to double check on the quickstart configuration (I am not fluent enough Gradle): - We do not need to hide/shade any dependencies in the user code. In Maven, we use the shade plugin, but only to build an uber jar, not to actually relocate dependencies. Is that the same in the Gradle quickstart? - The Flink core dependencies need to be in a scope equivalent to "provided", so they do not end up in the uber jar. Can we do something similar in Gradle? This has been a frequent source of unnecessarily bloated application jars. - The Maven quickstart template uses a trick to make sure that the provided dependencies are still in the classpath when we run the program in the IDE: A profile that activates in IDEA (by a property variable) and alters the scope from *provided* to *compile*. Not sure if that is strictly necessary, but may be helpful. ---
[jira] [Commented] (FLINK-9256) NPE in SingleInputGate#updateInputChannel() for non-credit based flow control
[ https://issues.apache.org/jira/browse/FLINK-9256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458923#comment-16458923 ] ASF GitHub Bot commented on FLINK-9256: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5914 Change looks very good, thanks! Merging this... We can probably remove most of this code out again later, once we drop the non-credit-based code paths in the next releases. But that still makes this a necessary fix for now... > NPE in SingleInputGate#updateInputChannel() for non-credit based flow control > - > > Key: FLINK-9256 > URL: https://issues.apache.org/jira/browse/FLINK-9256 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {{SingleInputGate#updateInputChannel()}} fails to update remote partitions > without credit based flow control due to a {{NullPointerException}} from > {{networkBufferPool == null}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5914: [FLINK-9256][network] fix NPE in SingleInputGate#updateIn...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5914 Change looks very good, thanks! Merging this... We can probably remove most of this code out again later, once we drop the non-credit-based code paths in the next releases. But that still makes this a necessary fix for now... ---
[GitHub] flink issue #5916: [hotfix][tests] remove redundant rebalance in SuccessAfte...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5916 Merging... ---
[jira] [Commented] (FLINK-9274) Add thread name to Kafka Partition Discovery
[ https://issues.apache.org/jira/browse/FLINK-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458916#comment-16458916 ] ASF GitHub Bot commented on FLINK-9274: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5942 Good fix, thanks! Merging... > Add thread name to Kafka Partition Discovery > > > Key: FLINK-9274 > URL: https://issues.apache.org/jira/browse/FLINK-9274 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > For debugging, threads should have names to filter on and get a quick > overview. The Kafka partition discovery thread(s) currently don't have any > name assigned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5942: [FLINK-9274][kafka] add thread name for partition discove...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5942 Good fix, thanks! Merging... ---
[jira] [Commented] (FLINK-9275) Set more distinctive output flusher thread names
[ https://issues.apache.org/jira/browse/FLINK-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458913#comment-16458913 ] ASF GitHub Bot commented on FLINK-9275: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5943 Good fix, thanks, merging... > Set more distinctive output flusher thread names > > > Key: FLINK-9275 > URL: https://issues.apache.org/jira/browse/FLINK-9275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > > All output flusher threads are named "OutputFlusher" while at the only place > the {{StreamWriter}} is initialized, we already have the task name at hand. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5943: [FLINK-9275][streaming] add taskName to the output flushe...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5943 Good fix, thanks, merging... ---
[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458908#comment-16458908 ] ASF GitHub Bot commented on FLINK-9196: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5938 Merging this... > YARN: Flink binaries are not deleted from HDFS after cluster shutdown > - > > Key: FLINK-9196 > URL: https://issues.apache.org/jira/browse/FLINK-9196 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: 0001-xxx.patch > > > When deploying on YARN in flip6 mode, the Flink binaries are not deleted from > HDFS after the cluster shuts down. > *Steps to reproduce* > # Submit job in YARN job mode, non-detached: > {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat} > # Check contents of {{/user/hadoop/.flink/}} on HDFS after > job is finished: > {noformat} > [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls > /user/hadoop/.flink/application_1523966184826_0016 > Found 6 items > -rw-r--r-- 1 hadoop hadoop583 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml > -rw-r--r-- 1 hadoop hadoop332 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp > -rw-r--r-- 1 hadoop hadoop 89779342 2018-04-02 17:08 > /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar > drwxrwxrwx - hadoop hadoop 0 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/lib > -rw-r--r-- 1 hadoop hadoop 1939 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/log4j.properties > -rw-r--r-- 1 hadoop hadoop 2331 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/logback.xml > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9196) YARN: Flink binaries are not deleted from HDFS after cluster shutdown
[ https://issues.apache.org/jira/browse/FLINK-9196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458906#comment-16458906 ] ASF GitHub Bot commented on FLINK-9196: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5938#discussion_r185080370 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -570,6 +571,21 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc }); } + @Override + public void shutDownCluster() { + try { + sendRetryableRequest( + ShutdownHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + isConnectionProblemException()).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Error while shutting down cluster", e); --- End diff -- Throw the cause of the `ExecutionException`? > YARN: Flink binaries are not deleted from HDFS after cluster shutdown > - > > Key: FLINK-9196 > URL: https://issues.apache.org/jira/browse/FLINK-9196 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: 0001-xxx.patch > > > When deploying on YARN in flip6 mode, the Flink binaries are not deleted from > HDFS after the cluster shuts down. > *Steps to reproduce* > # Submit job in YARN job mode, non-detached: > {noformat} HADOOP_CLASSPATH=`hadoop classpath` bin/flink run -m yarn-cluster > -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar {noformat} > # Check contents of {{/user/hadoop/.flink/}} on HDFS after > job is finished: > {noformat} > [hadoop@ip-172-31-43-78 flink-1.5.0]$ hdfs dfs -ls > /user/hadoop/.flink/application_1523966184826_0016 > Found 6 items > -rw-r--r-- 1 hadoop hadoop583 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/90cf5b3a-039e-4d52-8266-4e9563d74827-taskmanager-conf.yaml > -rw-r--r-- 1 hadoop hadoop332 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/application_1523966184826_0016-flink-conf.yaml3818971235442577934.tmp > -rw-r--r-- 1 hadoop hadoop 89779342 2018-04-02 17:08 > /user/hadoop/.flink/application_1523966184826_0016/flink-dist_2.11-1.5.0.jar > drwxrwxrwx - hadoop hadoop 0 2018-04-17 14:54 > /user/hadoop/.flink/application_1523966184826_0016/lib > -rw-r--r-- 1 hadoop hadoop 1939 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/log4j.properties > -rw-r--r-- 1 hadoop hadoop 2331 2018-04-02 15:37 > /user/hadoop/.flink/application_1523966184826_0016/logback.xml > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5938: [FLINK-9196][flip6, yarn] Cleanup application files when ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5938 Merging this... ---
[GitHub] flink pull request #5938: [FLINK-9196][flip6, yarn] Cleanup application file...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5938#discussion_r185080370 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -570,6 +571,21 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc }); } + @Override + public void shutDownCluster() { + try { + sendRetryableRequest( + ShutdownHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + isConnectionProblemException()).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Error while shutting down cluster", e); --- End diff -- Throw the cause of the `ExecutionException`? ---
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458893#comment-16458893 ] ASF GitHub Bot commented on FLINK-8900: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5944 The test failure is unrelated - unrelated test flakeyness > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:46511 > 2018-03-08 16:48:39,975 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting down > BLOB cache > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5944 The test failure is unrelated - unrelated test flakeyness ---
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5931 @GJL Briefly digging through the log, there are a few strange things happening: - `YarnResourceManager` still has 8 pending requests even when 11 containers are running: ```Received new container: container_1524853016208_0001_01_000184 - Remaining pending container requests: 8``` - Some slots are requested and then the requests are cancelled again - In the end, one request is not fulfilled: `aeec2a9f010a187e04e31e6efd6f0f88` Might be an inconsistency in either in the `SlotManager` or `SlotPool`. ---
[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458891#comment-16458891 ] ASF GitHub Bot commented on FLINK-9190: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5931 @GJL Briefly digging through the log, there are a few strange things happening: - `YarnResourceManager` still has 8 pending requests even when 11 containers are running: ```Received new container: container_1524853016208_0001_01_000184 - Remaining pending container requests: 8``` - Some slots are requested and then the requests are cancelled again - In the end, one request is not fulfilled: `aeec2a9f010a187e04e31e6efd6f0f88` Might be an inconsistency in either in the `SlotManager` or `SlotPool`. > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9279) PythonPlanBinderTest flakey
Stephan Ewen created FLINK-9279: --- Summary: PythonPlanBinderTest flakey Key: FLINK-9279 URL: https://issues.apache.org/jira/browse/FLINK-9279 Project: Flink Issue Type: Bug Components: Python API, Tests Affects Versions: 1.5.0 Reporter: Stephan Ewen The test fails while trying to create the parent directory {{/tmp/flink}}. That happens if a file with that name already exists. The Python Plan binder apparently used a fix name for the temp directory, but should use a statistically unique random name instead. Full test run log: https://api.travis-ci.org/v3/job/373120733/log.txt Relevant Stack Trace {code} Job execution failed. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:898) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:841) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Mkdirs failed to create /tmp/flink at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121) at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) at org.apache.flink.api.java.io.CsvOutputFormat.open(CsvOutputFormat.java:161) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 59.839 sec <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest) Time elapsed: 14.912 sec <<< FAILURE! java.lang.AssertionError: Error while calling the test program: Job execution failed. at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:161) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at
[jira] [Created] (FLINK-9278) Allow restore savepoint with some SQL queries added/removed
Adrian Hains created FLINK-9278: --- Summary: Allow restore savepoint with some SQL queries added/removed Key: FLINK-9278 URL: https://issues.apache.org/jira/browse/FLINK-9278 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Adrian Hains We are running a Flink job that contains multiple SQL queries. This is configured by calling sqlQuery(String) one time for each SQL query, on a single instance of StreamTableEnvironment. The queries are simple aggregations with a tumble window. Currently I can configure my environment with queries Q1, Q2, and Q3, create a savepoint, and restart the job from that savepoint if the same set of SQL queries are used. If I remove some queries and add some others, Q2, Q4, and Q3, I am unable to restart the job from the same savepoint. This behavior is expected, as the documentation clearly describes that the operator IDs are generated if they are not explicitly defined, and they cannot be explicitly defined when using flink SQL. I would like to be able to specify a scoping operator id prefix when registering a SQL query to a StreamTableEnvironment. This can then be used to programmatically generate unique IDs for each of the operators created to execute the SQL queries. For example, if I specify a prefix of "ID:Q2:" for my Q2 query, and I restart the job with an identical SQL query for this prefix, then I would be able to restore the state for this query even in the presence of other queries being added or removed to the job graph. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9277) Reduce noisiness of SlotPool logging
[ https://issues.apache.org/jira/browse/FLINK-9277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458828#comment-16458828 ] Chesnay Schepler commented on FLINK-9277: - Kind of a duplicate of FLINK-9215. In this [PR|https://github.com/apache/flink/pull/5879] we've been thinking of adding a separate exception class used for the expected life-cycle of resources, which doesn't print a stacktrace. > Reduce noisiness of SlotPool logging > > > Key: FLINK-9277 > URL: https://issues.apache.org/jira/browse/FLINK-9277 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Critical > > The slot pool logs a vary large amount of stack traces with meaningless > exceptions like {code} > org.apache.flink.util.FlinkException: Release multi task slot because all > children have been released. > {code} > This makes log parsing very hard. > For an example, see this log: > https://gist.githubusercontent.com/GJL/3b109db48734ff40103f47d04fc54bd3/raw/e3afc0ec3f452bad681e388016bcf799bba56f10/gistfile1.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458785#comment-16458785 ] ASF GitHub Bot commented on FLINK-8286: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185050544 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Why is this exception being swallowed? > Fix Flink-Yarn-Kerberos integration for FLIP-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current Flink-Yarn-Kerberos in Flip-6 is broken. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458784#comment-16458784 ] ASF GitHub Bot commented on FLINK-8286: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185052704 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185052704 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath
[GitHub] flink pull request #5896: [FLINK-8286][Security] Fix kerberos security confi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185050544 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** -* The instance entry point for the YARN task executor. Obtains user group information and calls -* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a -* privileged action. +* Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * -* @param args The command line arguments. +* @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Mapenvs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); --- End diff -- Why is this exception being swallowed? ---
[jira] [Created] (FLINK-9277) Reduce noisiness of SlotPool logging
Stephan Ewen created FLINK-9277: --- Summary: Reduce noisiness of SlotPool logging Key: FLINK-9277 URL: https://issues.apache.org/jira/browse/FLINK-9277 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Stephan Ewen Assignee: Till Rohrmann The slot pool logs a vary large amount of stack traces with meaningless exceptions like {code} org.apache.flink.util.FlinkException: Release multi task slot because all children have been released. {code} This makes log parsing very hard. For an example, see this log: https://gist.githubusercontent.com/GJL/3b109db48734ff40103f47d04fc54bd3/raw/e3afc0ec3f452bad681e388016bcf799bba56f10/gistfile1.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458775#comment-16458775 ] ASF GitHub Bot commented on FLINK-9269: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5934 Thanks for your comments, @StephanEwen , If I am not misunderstanding , we don't need to duplicate the serializer now, because we will have a dedicated optimization for it in the near future, I am `+1` for that. Then, what about the concurrency problem cause by the `stateTables`, it's an obvious bug that there could be multi thread access the `stateTab` concurrently, and one of them could modify the `stateTab`...But so far, no users have reported that problem yet, maybe that's because most of the user are using the `RocksDBKeyedBackend` online instead of `HeapKeyedStateBackend`, so I think this is not an urgent bug, but...it's still a bug, Is it should be fixed for 1.5? > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > - > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > {code:java} > @Nonnull > @Override > protected SnapshotResult performOperation() throws > Exception { > // do something >long[] keyGroupRangeOffsets = new > long[keyGroupRange.getNumberOfKeyGroups()]; >for (int keyGroupPos = 0; keyGroupPos < > keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { > int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); > keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); > outView.writeInt(keyGroupId); > for (Map.Entry> kvState : > stateTables.entrySet()) { > // do something > } > } > // do something > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5934 Thanks for your comments, @StephanEwen , If I am not misunderstanding , we don't need to duplicate the serializer now, because we will have a dedicated optimization for it in the near future, I am `+1` for that. Then, what about the concurrency problem cause by the `stateTables`, it's an obvious bug that there could be multi thread access the `stateTab` concurrently, and one of them could modify the `stateTab`...But so far, no users have reported that problem yet, maybe that's because most of the user are using the `RocksDBKeyedBackend` online instead of `HeapKeyedStateBackend`, so I think this is not an urgent bug, but...it's still a bug, Is it should be fixed for 1.5? ---
[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458774#comment-16458774 ] Aljoscha Krettek commented on FLINK-8286: - [~suez1224] Could you describe a bit what the problem was and how this fixes it? I'm also mostly interested in why {{YARNSessionFIFOSecuredITCase}} didn't fail, for example. > Fix Flink-Yarn-Kerberos integration for FLIP-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current Flink-Yarn-Kerberos in Flip-6 is broken. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9276) Improve error message when TaskManager fails
Stephan Ewen created FLINK-9276: --- Summary: Improve error message when TaskManager fails Key: FLINK-9276 URL: https://issues.apache.org/jira/browse/FLINK-9276 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Stephan Ewen When a TaskManager fails, we frequently get a message {code} org.apache.flink.util.FlinkException: Releasing TaskManager container_1524853016208_0001_01_000102 {code} This message is misleading in that it sounds like an intended operation, when it really is a failure of a container that the {{ResourceManager}} reports to the {{JobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458767#comment-16458767 ] ASF GitHub Bot commented on FLINK-9190: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5931 Hi @GJL , is it possible that the reason is the same as in the previous PR for this ticket, that is even the container setup successfully and connect with ResourceManager successfully, but the TM was killed before connecting to JobManager successfully. In this case, even though there are enough TMs, JobManager won't fire any new request, and the ResourceManager doesn't know that the container it assigned to JobManager has been killed either, so both JobManager & ResourceManager won't do anything but waiting for timeout... What do you think? > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5931 Hi @GJL , is it possible that the reason is the same as in the previous PR for this ticket, that is even the container setup successfully and connect with ResourceManager successfully, but the TM was killed before connecting to JobManager successfully. In this case, even though there are enough TMs, JobManager won't fire any new request, and the ResourceManager doesn't know that the container it assigned to JobManager has been killed either, so both JobManager & ResourceManager won't do anything but waiting for timeout... What do you think? ---
[jira] [Updated] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9269: -- Priority: Major (was: Blocker) > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > - > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > {code:java} > @Nonnull > @Override > protected SnapshotResult performOperation() throws > Exception { > // do something >long[] keyGroupRangeOffsets = new > long[keyGroupRange.getNumberOfKeyGroups()]; >for (int keyGroupPos = 0; keyGroupPos < > keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { > int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); > keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); > outView.writeInt(keyGroupId); > for (Map.Entry> kvState : > stateTables.entrySet()) { > // do something > } > } > // do something > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458756#comment-16458756 ] Sihua Zhou commented on FLINK-9269: --- Hi [~aljoscha], I didn't find a symptom there yet. But I think I can trigger the concurrency problem very easily, because it's an obviously bug that there could be multi thread access the `stateTab` concurrency, and one of there can modify the `stateTab`...But so far, no users have reported the problem, maybe that's because most of the user are using the RocksDBKeyedBackend online instead of HeapKeyedStateBackend, now I'm going to remove the it from the BROCKER list. > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > - > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > {code:java} > @Nonnull > @Override > protected SnapshotResult performOperation() throws > Exception { > // do something >long[] keyGroupRangeOffsets = new > long[keyGroupRange.getNumberOfKeyGroups()]; >for (int keyGroupPos = 0; keyGroupPos < > keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { > int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); > keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); > outView.writeInt(keyGroupId); > for (Map.Entry> kvState : > stateTables.entrySet()) { > // do something > } > } > // do something > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458752#comment-16458752 ] ASF GitHub Bot commented on FLINK-8900: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5944 [FLINK-8900] [yarn] Set correct application status when job is finished ## What is the purpose of the change When finite Flink applications (batch jobs) are sent to YARN in the detached mode, the final status is currently always the same, because the job's result is not passed to the logic that initiates the application shutdown. This PR forwards the final job status via a future that is used to register the shutdown handlers. ## Brief change log - Introduce the `JobTerminationFuture` in the `MiniDispatcher` - ## Verifying this change ``` bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar ``` - Run the batch job as described above on YARN to succeed, check that the final application status is successful. - Run the batch job with a parameter to a non existing input file on YARN, check that the final application status is failed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink yarn_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5944 commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7 Author: Stephan EwenDate: 2018-04-30T07:55:50Z [hotfix] [tests] Update log4j-test.properties Brings the logging definition in sync with other projects. Updates the classname for the suppressed logger in Netty to account for the new shading model introduced in Flink 1.4. commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23 Author: Stephan Ewen Date: 2018-04-27T16:57:27Z [FLINK-8900] [yarn] Set correct application status when job is finished > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5944 [FLINK-8900] [yarn] Set correct application status when job is finished ## What is the purpose of the change When finite Flink applications (batch jobs) are sent to YARN in the detached mode, the final status is currently always the same, because the job's result is not passed to the logic that initiates the application shutdown. This PR forwards the final job status via a future that is used to register the shutdown handlers. ## Brief change log - Introduce the `JobTerminationFuture` in the `MiniDispatcher` - ## Verifying this change ``` bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar ``` - Run the batch job as described above on YARN to succeed, check that the final application status is successful. - Run the batch job with a parameter to a non existing input file on YARN, check that the final application status is failed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink yarn_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5944 commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7 Author: Stephan EwenDate: 2018-04-30T07:55:50Z [hotfix] [tests] Update log4j-test.properties Brings the logging definition in sync with other projects. Updates the classname for the suppressed logger in Netty to account for the new shading model introduced in Flink 1.4. commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23 Author: Stephan Ewen Date: 2018-04-27T16:57:27Z [FLINK-8900] [yarn] Set correct application status when job is finished ---
[jira] [Commented] (FLINK-9174) The type of state created in ProccessWindowFunction.proccess() is inconsistency
[ https://issues.apache.org/jira/browse/FLINK-9174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458736#comment-16458736 ] ASF GitHub Bot commented on FLINK-9174: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5847 @aljoscha Thanks for your review, I have addressed your comments. > The type of state created in ProccessWindowFunction.proccess() is > inconsistency > --- > > Key: FLINK-9174 > URL: https://issues.apache.org/jira/browse/FLINK-9174 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > The type of state created from windowState and globalState in > {{ProcessWindowFunction.process()}} is inconsistency. For detail, > {code} > context.windowState().getListState(); // return type is HeapListState or > RocksDBListState > context.globalState().getListState(); // return type is UserFacingListState > {code} > This cause the problem in the following code, > {code} > Iterable iterableState = listState.get(); > if (terableState.iterator().hasNext()) { >for (T value : iterableState) { > value.setRetracting(true); > collector.collect(value); >} >state.clear(); > } > {code} > If the {{listState}} is created from {{context.globalState()}} is fine, but > when it created from {{context.windowState()}} this will cause NPE. I met > this in 1.3.2 but I found it also affect 1.5.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5847: [FLINK-9174][datastream]Fix the type of state created in ...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5847 @aljoscha Thanks for your review, I have addressed your comments. ---
[jira] [Commented] (FLINK-9202) AvroSerializer should not be serializing the target Avro type class
[ https://issues.apache.org/jira/browse/FLINK-9202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458728#comment-16458728 ] Stephan Ewen commented on FLINK-9202: - I think the problem is different: The config snapshots should not be serializing serializers. The serializer itself is perfectly correct to hold the avro type class. I would either rename this issue or close it and create the proper issue as part of the state evolution umbrella issues. > AvroSerializer should not be serializing the target Avro type class > --- > > Key: FLINK-9202 > URL: https://issues.apache.org/jira/browse/FLINK-9202 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > The {{AvroSerializer}} contains this field which is written when the > serializer is written into savepoints: > [https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L78] > This causes Avro schema evolution to not work properly, because Avro > generated classes have non-fixed serialVersionUIDs. Once a new Avro class is > generated with a new schema, that class can not be loaded on restore due to > incompatible UIDs, and thus the serializer can not be successfully > deserialized. > A possible solution would be to only write the classname, and dynamically > load the class into a transient field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9234) Commons Logging is missing from shaded Flink Table library
[ https://issues.apache.org/jira/browse/FLINK-9234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458725#comment-16458725 ] ASF GitHub Bot commented on FLINK-9234: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5897 Can we actually get rid of `commons-configuration` in the table API? All the commons packages with their weird long tail of not properly declared dependencies have become a bit of an anti-pattern to me over time... > Commons Logging is missing from shaded Flink Table library > -- > > Key: FLINK-9234 > URL: https://issues.apache.org/jira/browse/FLINK-9234 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.2 > Environment: jdk1.8.0_172 > flink 1.4.2 > Mac High Sierra >Reporter: Eron Wright >Assignee: Timo Walther >Priority: Blocker > Attachments: repro.scala > > > The flink-table shaded library seems to be missing some classes from > {{org.apache.commons.logging}} that are required by > {{org.apache.commons.configuration}}. Ran into the problem while using the > external catalog support, on Flink 1.4.2. > See attached a repro, which produces: > {code} > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/flink/table/shaded/org/apache/commons/logging/Log > at > org.apache.flink.table.catalog.ExternalTableSourceUtil$.parseScanPackagesFromConfigFile(ExternalTableSourceUtil.scala:153) > at > org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala:55) > at > org.apache.flink.table.catalog.ExternalTableSourceUtil$.(ExternalTableSourceUtil.scala) > at > org.apache.flink.table.catalog.ExternalCatalogSchema.getTable(ExternalCatalogSchema.scala:78) > at > org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:82) > at > org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:256) > at > org.apache.calcite.jdbc.CalciteSchema$SchemaPlusImpl.getTable(CalciteSchema.java:561) > at > org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:497) > at > org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) > at Repro$.main(repro.scala:17) > at Repro.main(repro.scala) > {code} > Dependencies: > {code} > compile 'org.slf4j:slf4j-api:1.7.25' > compile 'org.slf4j:slf4j-log4j12:1.7.25' > runtime 'log4j:log4j:1.2.17' > compile 'org.apache.flink:flink-scala_2.11:1.4.2' > compile 'org.apache.flink:flink-streaming-scala_2.11:1.4.2' > compile 'org.apache.flink:flink-clients_2.11:1.4.2' > compile 'org.apache.flink:flink-table_2.11:1.4.2' > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5897 Can we actually get rid of `commons-configuration` in the table API? All the commons packages with their weird long tail of not properly declared dependencies have become a bit of an anti-pattern to me over time... ---
[jira] [Commented] (FLINK-9250) JoinTaskExternalITCase deadlocks on travis
[ https://issues.apache.org/jira/browse/FLINK-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458723#comment-16458723 ] Stephan Ewen commented on FLINK-9250: - I think this is not actually deadlocked. The test is not distributed and the stack trace indicates the thread is actually running at the point when the test fails. The test timed out. It must have either incredibly slow for some reason, or the clock jumped. In any case, I would close this as a non issue at this point. Please close if you agree. > JoinTaskExternalITCase deadlocks on travis > -- > > Key: FLINK-9250 > URL: https://issues.apache.org/jira/browse/FLINK-9250 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/368995097 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5924: [hotfix][README.md] Update building prerequisites
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5924 Thanks, merging this... ---
[jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
[ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458699#comment-16458699 ] Stephan Ewen commented on FLINK-9268: - For the short run, would be great to have a better exception message here. For the long run, I think we need to contribute a {{ByteBuffer}} style interface to RocksDB. > RockDB errors from WindowOperator > - > > Key: FLINK-9268 > URL: https://issues.apache.org/jira/browse/FLINK-9268 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > The job has no sinks, one Kafka source, does a windowing based on session and > uses processing time. The job fails with the error given below after running > for few hours. The only way to recover from this error is to cancel the job > and start a new one. > Using S3 backend for externalized checkpoints. > A representative job DAG: > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > A representative config: > state.backend=rocksDB > checkpoint.enabled=true > external.checkpoint.enabled=true > checkpoint.mode=AT_LEAST_ONCE > checkpoint.interval=90 > checkpoint.timeout=30 > Error: > TimerException\{java.lang.NegativeArraySizeException} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NegativeArraySizeException > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458692#comment-16458692 ] ASF GitHub Bot commented on FLINK-9269: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5934 Concerning serializer snapshots: - We need to move away from Java Serializing the serializers into the config snapshots anyways and should do that in the near future. - I think the config snapshot should be created once when the state is created, encoded as `byte[]`, and then we only write the bytes. That safes us from repeated work on every checkpoint and would also prevent concurrent access to the serializer for creating the snapshot. > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > - > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > {code:java} > @Nonnull > @Override > protected SnapshotResult performOperation() throws > Exception { > // do something >long[] keyGroupRangeOffsets = new > long[keyGroupRange.getNumberOfKeyGroups()]; >for (int keyGroupPos = 0; keyGroupPos < > keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { > int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); > keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); > outView.writeInt(keyGroupId); > for (Map.Entry> kvState : > stateTables.entrySet()) { > // do something > } > } > // do something > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5934 Concerning serializer snapshots: - We need to move away from Java Serializing the serializers into the config snapshots anyways and should do that in the near future. - I think the config snapshot should be created once when the state is created, encoded as `byte[]`, and then we only write the bytes. That safes us from repeated work on every checkpoint and would also prevent concurrent access to the serializer for creating the snapshot. ---
[jira] [Commented] (FLINK-9270) Upgrade RocksDB to 5.11.3, and resolve concurrent test invocation problem of @RetryOnFailure
[ https://issues.apache.org/jira/browse/FLINK-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458687#comment-16458687 ] Stephan Ewen commented on FLINK-9270: - Do we know why the merge operator regressed again? Was there a problem with the current implementation? > Upgrade RocksDB to 5.11.3, and resolve concurrent test invocation problem of > @RetryOnFailure > > > Key: FLINK-9270 > URL: https://issues.apache.org/jira/browse/FLINK-9270 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > Upgrade RocksDB to 5.11.3 to take latest bug fixes > Besides, I found that unit tests annotated with {{@RetryOnFailure}} will be > run concurrently if there's only {{try}} clause without a {{catch}} > following. For example, sometimes, > {{RocksDBPerformanceTest.testRocksDbMergePerformance()}} will actually be > running in 3 concurrent invocations, and multiple concurrent write to RocksDB > result in errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9271) flink-1.4.2-bin-scala_2.11.tgz is not in gzip format
[ https://issues.apache.org/jira/browse/FLINK-9271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458684#comment-16458684 ] Stephan Ewen commented on FLINK-9271: - I just tried this and get {code} file flink-1.4.2-bin-scala_2.11.tgz.1 flink-1.4.2-bin-scala_2.11.tgz.1: gzip compressed data {code} How did you get that particular file? > flink-1.4.2-bin-scala_2.11.tgz is not in gzip format > > > Key: FLINK-9271 > URL: https://issues.apache.org/jira/browse/FLINK-9271 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.2 >Reporter: Martin Grigorov >Priority: Minor > > Hi, > I've just downloaded "Flink Without Hadoop" from > [http://flink.apache.org/downloads.html.] > The name of the downloaded file is "flink-1.4.2-bin-scala_2.11.tgz" but > trying to unpack it fails with: > {code} > tar zxvf flink-1.4.2-bin-scala_2.11.tgz > gzip: stdin: not in gzip format > tar: Child returned status 1 > tar: Error is not recoverable: exiting now > {code} > {code} > file flink-1.4.2-bin-scala_2.11.tgz > > > flink-1.4.2-bin-scala_2.11.tgz: POSIX tar archive (GNU) > {code} > I'd suggest to rename the artefact to flink-1.4.2-bin-scala_2.11.*tar* to > make it more clear what is inside and how to unpack it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9273) Class cast exception
[ https://issues.apache.org/jira/browse/FLINK-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458672#comment-16458672 ] Stephan Ewen commented on FLINK-9273: - Can you share the program that caused this exception? At a first glance, this looks like there is something wrong in your application, like using a raw type or doing a wrong unchecked cast from a {{StreamSource}} to a {{DataStream}} or so. > Class cast exception > > > Key: FLINK-9273 > URL: https://issues.apache.org/jira/browse/FLINK-9273 > Project: Flink > Issue Type: Bug > Components: DataStream API, Streaming, Table API SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > Exception stack is as follows: > org.apache.flink.runtime.client.JobExecutionException: > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:621) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:385) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:630) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:583) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:396) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:307) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > ... 1 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9214) YarnClient should be stopped in YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal
[ https://issues.apache.org/jira/browse/FLINK-9214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458669#comment-16458669 ] ASF GitHub Bot commented on FLINK-9214: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5892 Thanks, looks good, merging this... > YarnClient should be stopped in > YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal > > > Key: FLINK-9214 > URL: https://issues.apache.org/jira/browse/FLINK-9214 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnClusterInternal > creates YarnClient without stopping it at the end of the test. > YarnClient yc should be stopped before returning. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5892: [FLINK-9214] YarnClient should be stopped in YARNSessionC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5892 Thanks, looks good, merging this... ---
[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5928 The configuration (`config.md`)should be generated from the config options by now, so not be manually edited. (@zentol could you chime in here?) ---
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r185021293 --- Diff: docs/dev/stream/state/checkpointing.md --- @@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... -- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. +- `state.checkpoints.dir`: The target directory for storing checkpoints data files and meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. --- End diff -- Yes, `file:///` is what you use for many NAS style storage systems, so it is not local-only. Let's change this to say that the storage path must be accessible from all participating processes/nodes, i.e., all TaskManagers and JobManagers ---
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458620#comment-16458620 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 @twalthr Hi, thanks for your review. I have updated the pr according to your suggestions. Changes mainly include: - Remove changes about UpsertSink - Refactor test case name and add more test to cover code path - Add more method comments - Add another base class `NonWindowOuterJoinWithNonEquiPredicates` and move corresponding variables and functions into it. - Split `CRowWrappingMultiOutputCollector` into `CRowWrappingMultiOutputCollector` and `LazyOutputCollector`. Best, Hequn. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9261) Regression - Flink CLI and Web UI not working when SSL is enabled
[ https://issues.apache.org/jira/browse/FLINK-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458622#comment-16458622 ] Stephan Ewen commented on FLINK-9261: - Chesnay's suggestion may fix this, by enabling / disabling REST security separate from on-the-wire encryption. For a clean solution, there is something more needed, because clients now communicate via REST with the server, and I think users would very much like the client to be authenticated as well when submitting something to a Flink server. The configuration posted above is affecting the RPC and data place meaning. - Previously (Flink 1.4) it was TM/TM and JM/TM and Client/JM communication. - Now (Flink 1.5+), this affects only TM/TM and JM/TM. > Regression - Flink CLI and Web UI not working when SSL is enabled > - > > Key: FLINK-9261 > URL: https://issues.apache.org/jira/browse/FLINK-9261 > Project: Flink > Issue Type: Bug > Components: Client, Network, Web Client >Affects Versions: 1.5.0 >Reporter: Edward Rojas >Priority: Blocker > Labels: regression > Fix For: 1.5.0 > > > When *security.ssl.enabled* config is set to true, Web UI is no longer > reachable; there is no logs on jobmanager. > > When setting *web.ssl.enabled* to false (keeping security.ssl.enabled to > true), the dashboard is not reachable and there is the following exception on > jobmanager: > {code:java} > WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - > Unhandled exception > org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: > not an SSL/TLS record: > 474554202f20485454502f312e310d0a486f73743a206c6f63616c686f73743a383038310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a557067726164652d496e7365637572652d52657175657374733a20310d0a557365722d4167656e743a204d6f7a696c6c612f352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31335f3329204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f36352e302e32352e313831205361666172692f3533372e33360d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c696d6167652f776562702c696d6167652f61706e672c2a2f2a3b713d302e380d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2c656e2d47423b713d302e392c65732d3431393b713d302e382c65733b713d302e372c66722d46523b713d302e362c66723b713d302e350d0a436f6f6b69653a20496465612d39326365626136363d39396464633637632d613838382d346439332d396166612d3737396631373636326264320d0a49662d4d6f6469666965642d53696e63653a205468752c2032362041707220323031382031313a30313a313520474d540d0a0d0a > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:940) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > {code} > Also when trying to use the Flink CLI, it get stuck on "Waiting for > response..." and there is no error messages on jobmanager. None of the > commands works, list, run etc. > > Taskmanagers are able to registrate to Jobmanager, so the SSL configuration > is good. > > SSL configuration: > security.ssl.enabled: true > security.ssl.keystore: /path/to/keystore >
[GitHub] flink issue #5327: [FLINK-8428] [table] Implement stream-stream non-window l...
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/5327 @twalthr Hi, thanks for your review. I have updated the pr according to your suggestions. Changes mainly include: - Remove changes about UpsertSink - Refactor test case name and add more test to cover code path - Add more method comments - Add another base class `NonWindowOuterJoinWithNonEquiPredicates` and move corresponding variables and functions into it. - Split `CRowWrappingMultiOutputCollector` into `CRowWrappingMultiOutputCollector` and `LazyOutputCollector`. Best, Hequn. ---
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458614#comment-16458614 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995939 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 =
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995939 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStateBackend(getStateBackend) +StreamITCase.clear + +val data1 = new mutable.MutableList[(Int, Long, String)] +data1.+=((1, 1L, "Hi1")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 2L, "Hi2")) +data1.+=((1, 5L, "Hi3")) +data1.+=((2, 7L, "Hi5")) +data1.+=((1, 9L, "Hi6")) +data1.+=((1, 8L, "Hi8")) +data1.+=((3, 8L, "Hi9")) + +val data2 = new mutable.MutableList[(Int, Long, String)] +data2.+=((1, 1L, "HiHi")) +data2.+=((2, 2L, "HeHe")) +data2.+=((3, 2L, "HeHe")) + +val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) +val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + +tEnv.registerTable("T1", t1) +tEnv.registerTable("T2", t2) + +val sqlQuery = + """ +|SELECT t2.a, t2.c, t1.c +|FROM T1 as t1 JOIN T2 as t2 ON +| t1.a = t2.a AND +| t1.b > t2.b +|""".stripMargin + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + +val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hi,Hallo") +val results = result.toRetractStream[Row] +results.addSink(new StreamITCase.RetractingSink) +env.execute() +assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear +env.setStateBackend(getStateBackend) + +val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + +val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) +val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTable("Table3", ds1) +tEnv.registerTable("Table5", ds2) + +val result = tEnv.sqlQuery(sqlQuery) + +val expected = Seq("Hello world, how are
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458610#comment-16458610 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458613#comment-16458613 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- I planed to add right join in FLINK-8429. It's ok to add right join in this pr if you prefer. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458609#comment-16458609 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- OK. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) -val coMapFun = - new NonWindowInnerJoin( -leftSchema.typeInfo, -rightSchema.typeInfo, -CRowTypeInfo(returnType), -genFunction.name, -genFunction.code, -queryConfig) +val coMapFun = joinType match { + case JoinRelType.INNER => +new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => +new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- I planed to add right join in FLINK-8429. It's ok to add right join in this pr if you prefer. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458612#comment-16458612 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995668 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, Int)]('bb, 'c) + +val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" +), +"false, Acc" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" +), +"false, AccRetract" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val countDistinct = new CountDistinct +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + binaryNode( +"DataStreamJoin", +"DataStreamScan(true, Acc)", --- End diff -- `testJoin()` has covered this case. > Implement stream-stream non-window left outer join > -- > > Key: FLINK-8428 > URL: https://issues.apache.org/jira/browse/FLINK-8428 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Implement stream-stream non-window left outer join for sql/table-api. A > simple design doc can be found > [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995668 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, Int)]('bb, 'c) + +val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" +), +"false, Acc" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + +val expected = + unaryNode( +"DataStreamCalc", +binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" +), +"false, AccRetract" + ) +util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { +val util = streamTestForRetractionUtil() +val lTable = util.addTable[(Int, Int)]('a, 'b) +val rTable = util.addTable[(Int, String)]('bb, 'c) + +val countDistinct = new CountDistinct +val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + binaryNode( +"DataStreamJoin", +"DataStreamScan(true, Acc)", --- End diff -- `testJoin()` has covered this case. ---
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( -"UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- OK. ---
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458607#comment-16458607 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995438 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458606#comment-16458606 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995228 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995438 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...
Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995228 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val otherSideRow = otherSideEntry.getKey + val otherSideCntAndExpiredTime = otherSideEntry.getValue + // join + cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0) + callJoinFunction(inputRow, inputRowFromLeft,
[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join
[ https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458603#comment-16458603 ] ASF GitHub Bot commented on FLINK-8428: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184994673 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Base class for stream non-window outer Join. + * + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param isLeftJoin the type of join, whether it is the type of left join + * @param queryConfig the configuration for the query to generate + */ +abstract class NonWindowOuterJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +resultType: TypeInformation[CRow], +genJoinFuncName: String, +genJoinFuncCode: String, +isLeftJoin: Boolean, +queryConfig: StreamQueryConfig) + extends NonWindowJoin( +leftType, +rightType, +resultType, +genJoinFuncName, +genJoinFuncCode, +queryConfig) { + + // result row, all fields from right will be null. Used for output when there is no matched rows. + protected var leftResultRow: Row = _ + // result row, all fields from left will be null. Used for output when there is no matched rows. + protected var rightResultRow: Row = _ + // how many matched rows from the right table for each left row. Index 0 is used for left + // stream, index 1 is used for right stream. + protected var joinCntState: Array[MapState[Row, Long]] = _ + + override def open(parameters: Configuration): Unit = { +super.open(parameters) + +leftResultRow = new Row(resultType.getArity) +rightResultRow = new Row(resultType.getArity) + +joinCntState = new Array[MapState[Row, Long]](2) +val leftJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "leftJoinCnt", leftType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(0) = getRuntimeContext.getMapState(leftJoinCntStateDescriptor) +val rightJoinCntStateDescriptor = new MapStateDescriptor[Row, Long]( + "rightJoinCnt", rightType, Types.LONG.asInstanceOf[TypeInformation[Long]]) +joinCntState(1) = getRuntimeContext.getMapState(rightJoinCntStateDescriptor) + +LOG.debug(s"Instantiating NonWindowOuterJoin") + } + + /** +* Join current row with other side rows. Preserve current row if there are no matched rows +* from other side. +*/ + def preservedJoin( + inputRow: Row, + inputRowFromLeft: Boolean, + otherSideState: MapState[Row, JTuple2[Long, Long]], + curProcessTime: Long): Long = { + +val otherSideIterator = otherSideState.iterator() +while (otherSideIterator.hasNext) { + val otherSideEntry = otherSideIterator.next() + val