This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82f20c0a1b9c572d197598d8a8b4ec531dc4a376
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Mon Dec 9 11:31:02 2019 +0100

    [FLINK-14992] Extend test coverage of JobListenerITCase
    
    This changes the test to use a MiniClusterResource to be more efficient.
    
    This adds test coverage for the executeAsync() methods.
    
    This verifies that we call the job listeners from the main execution
    thread, which is important for some frameworks such as Zeppelin support.
---
 .../flink/test/execution/JobListenerITCase.java    | 194 ++++++++++++++++++++-
 1 file changed, 186 insertions(+), 8 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java
index ad0173f..609a0ec 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java
@@ -19,34 +19,58 @@
 package org.apache.flink.test.execution;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.deployment.executors.RemoteExecutor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Unit tests for {@link JobListener}.
  */
 public class JobListenerITCase extends TestLogger {
 
+       @ClassRule
+       public static MiniClusterWithClientResource miniClusterResource = new 
MiniClusterWithClientResource(
+                       new MiniClusterResourceConfiguration.Builder()
+                                       .build());
+
+       private static Configuration getClientConfiguration() {
+               Configuration result = new 
Configuration(miniClusterResource.getClientConfiguration());
+               result.set(DeploymentOptions.TARGET, RemoteExecutor.NAME);
+               return result;
+       }
+
        @Test
-       public void testJobListenerOnBatchEnvironment() throws Exception {
+       public void testExecuteCallsJobListenerOnBatchEnvironment() throws 
Exception {
+               AtomicReference<JobID> jobIdReference = new AtomicReference<>();
                OneShotLatch submissionLatch = new OneShotLatch();
                OneShotLatch executionLatch = new OneShotLatch();
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               ExecutionEnvironment env = new 
ExecutionEnvironment(getClientConfiguration());
 
                env.registerJobListener(new JobListener() {
                        @Override
-                       public void onJobSubmitted(JobClient jobClient, 
Throwable throwable) {
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               jobIdReference.set(jobClient.getJobID());
                                submissionLatch.trigger();
                        }
 
@@ -57,22 +81,100 @@ public class JobListenerITCase extends TestLogger {
                });
 
                env.fromElements(1, 2, 3, 4, 5).output(new 
DiscardingOutputFormat<>());
-               env.execute();
+               JobExecutionResult jobExecutionResult = env.execute();
 
                submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
                executionLatch.await(2000L, TimeUnit.MILLISECONDS);
+
+               assertThat(jobExecutionResult.getJobID(), 
is(jobIdReference.get()));
+       }
+
+       @Test
+       public void testExecuteAsyncCallsJobListenerOnBatchEnvironment() throws 
Exception {
+               AtomicReference<JobID> jobIdReference = new AtomicReference<>();
+               OneShotLatch submissionLatch = new OneShotLatch();
+
+               ExecutionEnvironment env = new 
ExecutionEnvironment(getClientConfiguration());
+
+               env.registerJobListener(new JobListener() {
+                       @Override
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               jobIdReference.set(jobClient.getJobID());
+                               submissionLatch.trigger();
+                       }
+
+                       @Override
+                       public void onJobExecuted(JobExecutionResult 
jobExecutionResult, Throwable throwable) {
+                       }
+               });
+
+               env.fromElements(1, 2, 3, 4, 5).output(new 
DiscardingOutputFormat<>());
+               JobClient jobClient = env.executeAsync();
+
+               submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
+               // when executing asynchronously we don't get an "executed" 
callback
+
+               assertThat(jobClient.getJobID(), is(jobIdReference.get()));
+       }
+
+       @Test
+       public void testExecuteCallsJobListenerOnMainThreadOnBatchEnvironment() 
throws Exception {
+               AtomicReference<Thread> threadReference = new 
AtomicReference<>();
+
+               ExecutionEnvironment env = new 
ExecutionEnvironment(getClientConfiguration());
+
+               env.registerJobListener(new JobListener() {
+                       @Override
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               threadReference.set(Thread.currentThread());
+                       }
+
+                       @Override
+                       public void onJobExecuted(JobExecutionResult 
jobExecutionResult, Throwable throwable) {
+                       }
+               });
+
+               env.fromElements(1, 2, 3, 4, 5).output(new 
DiscardingOutputFormat<>());
+               env.execute();
+
+               assertThat(Thread.currentThread(), is(threadReference.get()));
+       }
+
+       @Test
+       public void 
testExecuteAsyncCallsJobListenerOnMainThreadOnBatchEnvironment() throws 
Exception {
+               AtomicReference<Thread> threadReference = new 
AtomicReference<>();
+
+               ExecutionEnvironment env = new 
ExecutionEnvironment(getClientConfiguration());
+
+               env.registerJobListener(new JobListener() {
+                       @Override
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               threadReference.set(Thread.currentThread());
+                       }
+
+                       @Override
+                       public void onJobExecuted(JobExecutionResult 
jobExecutionResult, Throwable throwable) {
+                       }
+               });
+
+               env.fromElements(1, 2, 3, 4, 5).output(new 
DiscardingOutputFormat<>());
+               env.executeAsync();
+
+               assertThat(Thread.currentThread(), is(threadReference.get()));
        }
 
        @Test
-       public void testJobListenerOnStreamingEnvironment() throws Exception {
+       public void testExecuteCallsJobListenerOnStreamingEnvironment() throws 
Exception {
+               AtomicReference<JobID> jobIdReference = new AtomicReference<>();
                OneShotLatch submissionLatch = new OneShotLatch();
                OneShotLatch executionLatch = new OneShotLatch();
 
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(getClientConfiguration());
 
                env.registerJobListener(new JobListener() {
                        @Override
-                       public void onJobSubmitted(JobClient jobClient, 
Throwable throwable) {
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               jobIdReference.set(jobClient.getJobID());
                                submissionLatch.trigger();
                        }
 
@@ -83,9 +185,85 @@ public class JobListenerITCase extends TestLogger {
                });
 
                env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>());
-               env.execute();
+               JobExecutionResult jobExecutionResult = env.execute();
 
                submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
                executionLatch.await(2000L, TimeUnit.MILLISECONDS);
+
+               assertThat(jobExecutionResult.getJobID(), 
is(jobIdReference.get()));
+       }
+
+       @Test
+       public void testExecuteAsyncCallsJobListenerOnStreamingEnvironment() 
throws Exception {
+               AtomicReference<JobID> jobIdReference = new AtomicReference<>();
+               OneShotLatch submissionLatch = new OneShotLatch();
+
+               StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(getClientConfiguration());
+
+               env.registerJobListener(new JobListener() {
+                       @Override
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               jobIdReference.set(jobClient.getJobID());
+                               submissionLatch.trigger();
+                       }
+
+                       @Override
+                       public void onJobExecuted(JobExecutionResult 
jobExecutionResult, Throwable throwable) {
+                       }
+               });
+
+               env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>());
+               JobClient jobClient = env.executeAsync();
+
+               submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
+               // when executing asynchronously we don't get an "executed" 
callback
+
+               assertThat(jobClient.getJobID(), is(jobIdReference.get()));
+       }
+
+       @Test
+       public void 
testExecuteCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception {
+               AtomicReference<Thread> threadReference = new 
AtomicReference<>();
+
+               StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(getClientConfiguration());
+
+               env.registerJobListener(new JobListener() {
+                       @Override
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               threadReference.set(Thread.currentThread());
+                       }
+
+                       @Override
+                       public void onJobExecuted(JobExecutionResult 
jobExecutionResult, Throwable throwable) {
+                       }
+               });
+
+               env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>());
+               env.execute();
+
+               assertThat(Thread.currentThread(), is(threadReference.get()));
+       }
+
+       @Test
+       public void 
testExecuteAsyncCallsJobListenerOnMainThreadOnStreamEnvironment() throws 
Exception {
+               AtomicReference<Thread> threadReference = new 
AtomicReference<>();
+
+               StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(getClientConfiguration());
+
+               env.registerJobListener(new JobListener() {
+                       @Override
+                       public void onJobSubmitted(JobClient jobClient, 
Throwable t) {
+                               threadReference.set(Thread.currentThread());
+                       }
+
+                       @Override
+                       public void onJobExecuted(JobExecutionResult 
jobExecutionResult, Throwable throwable) {
+                       }
+               });
+
+               env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>());
+               env.executeAsync();
+
+               assertThat(Thread.currentThread(), is(threadReference.get()));
        }
 }

Reply via email to