This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 82f20c0a1b9c572d197598d8a8b4ec531dc4a376 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Mon Dec 9 11:31:02 2019 +0100 [FLINK-14992] Extend test coverage of JobListenerITCase This changes the test to use a MiniClusterResource to be more efficient. This adds test coverage for the executeAsync() methods. This verifies that we call the job listeners from the main execution thread, which is important for some frameworks such as Zeppelin support. --- .../flink/test/execution/JobListenerITCase.java | 194 ++++++++++++++++++++- 1 file changed, 186 insertions(+), 8 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java index ad0173f..609a0ec 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/execution/JobListenerITCase.java @@ -19,34 +19,58 @@ package org.apache.flink.test.execution; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.client.deployment.executors.RemoteExecutor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.JobListener; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; import org.junit.Test; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; /** * Unit tests for {@link JobListener}. */ public class JobListenerITCase extends TestLogger { + @ClassRule + public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .build()); + + private static Configuration getClientConfiguration() { + Configuration result = new Configuration(miniClusterResource.getClientConfiguration()); + result.set(DeploymentOptions.TARGET, RemoteExecutor.NAME); + return result; + } + @Test - public void testJobListenerOnBatchEnvironment() throws Exception { + public void testExecuteCallsJobListenerOnBatchEnvironment() throws Exception { + AtomicReference<JobID> jobIdReference = new AtomicReference<>(); OneShotLatch submissionLatch = new OneShotLatch(); OneShotLatch executionLatch = new OneShotLatch(); - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + ExecutionEnvironment env = new ExecutionEnvironment(getClientConfiguration()); env.registerJobListener(new JobListener() { @Override - public void onJobSubmitted(JobClient jobClient, Throwable throwable) { + public void onJobSubmitted(JobClient jobClient, Throwable t) { + jobIdReference.set(jobClient.getJobID()); submissionLatch.trigger(); } @@ -57,22 +81,100 @@ public class JobListenerITCase extends TestLogger { }); env.fromElements(1, 2, 3, 4, 5).output(new DiscardingOutputFormat<>()); - env.execute(); + JobExecutionResult jobExecutionResult = env.execute(); submissionLatch.await(2000L, TimeUnit.MILLISECONDS); executionLatch.await(2000L, TimeUnit.MILLISECONDS); + + assertThat(jobExecutionResult.getJobID(), is(jobIdReference.get())); + } + + @Test + public void testExecuteAsyncCallsJobListenerOnBatchEnvironment() throws Exception { + AtomicReference<JobID> jobIdReference = new AtomicReference<>(); + OneShotLatch submissionLatch = new OneShotLatch(); + + ExecutionEnvironment env = new ExecutionEnvironment(getClientConfiguration()); + + env.registerJobListener(new JobListener() { + @Override + public void onJobSubmitted(JobClient jobClient, Throwable t) { + jobIdReference.set(jobClient.getJobID()); + submissionLatch.trigger(); + } + + @Override + public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) { + } + }); + + env.fromElements(1, 2, 3, 4, 5).output(new DiscardingOutputFormat<>()); + JobClient jobClient = env.executeAsync(); + + submissionLatch.await(2000L, TimeUnit.MILLISECONDS); + // when executing asynchronously we don't get an "executed" callback + + assertThat(jobClient.getJobID(), is(jobIdReference.get())); + } + + @Test + public void testExecuteCallsJobListenerOnMainThreadOnBatchEnvironment() throws Exception { + AtomicReference<Thread> threadReference = new AtomicReference<>(); + + ExecutionEnvironment env = new ExecutionEnvironment(getClientConfiguration()); + + env.registerJobListener(new JobListener() { + @Override + public void onJobSubmitted(JobClient jobClient, Throwable t) { + threadReference.set(Thread.currentThread()); + } + + @Override + public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) { + } + }); + + env.fromElements(1, 2, 3, 4, 5).output(new DiscardingOutputFormat<>()); + env.execute(); + + assertThat(Thread.currentThread(), is(threadReference.get())); + } + + @Test + public void testExecuteAsyncCallsJobListenerOnMainThreadOnBatchEnvironment() throws Exception { + AtomicReference<Thread> threadReference = new AtomicReference<>(); + + ExecutionEnvironment env = new ExecutionEnvironment(getClientConfiguration()); + + env.registerJobListener(new JobListener() { + @Override + public void onJobSubmitted(JobClient jobClient, Throwable t) { + threadReference.set(Thread.currentThread()); + } + + @Override + public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) { + } + }); + + env.fromElements(1, 2, 3, 4, 5).output(new DiscardingOutputFormat<>()); + env.executeAsync(); + + assertThat(Thread.currentThread(), is(threadReference.get())); } @Test - public void testJobListenerOnStreamingEnvironment() throws Exception { + public void testExecuteCallsJobListenerOnStreamingEnvironment() throws Exception { + AtomicReference<JobID> jobIdReference = new AtomicReference<>(); OneShotLatch submissionLatch = new OneShotLatch(); OneShotLatch executionLatch = new OneShotLatch(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = new StreamExecutionEnvironment(getClientConfiguration()); env.registerJobListener(new JobListener() { @Override - public void onJobSubmitted(JobClient jobClient, Throwable throwable) { + public void onJobSubmitted(JobClient jobClient, Throwable t) { + jobIdReference.set(jobClient.getJobID()); submissionLatch.trigger(); } @@ -83,9 +185,85 @@ public class JobListenerITCase extends TestLogger { }); env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>()); - env.execute(); + JobExecutionResult jobExecutionResult = env.execute(); submissionLatch.await(2000L, TimeUnit.MILLISECONDS); executionLatch.await(2000L, TimeUnit.MILLISECONDS); + + assertThat(jobExecutionResult.getJobID(), is(jobIdReference.get())); + } + + @Test + public void testExecuteAsyncCallsJobListenerOnStreamingEnvironment() throws Exception { + AtomicReference<JobID> jobIdReference = new AtomicReference<>(); + OneShotLatch submissionLatch = new OneShotLatch(); + + StreamExecutionEnvironment env = new StreamExecutionEnvironment(getClientConfiguration()); + + env.registerJobListener(new JobListener() { + @Override + public void onJobSubmitted(JobClient jobClient, Throwable t) { + jobIdReference.set(jobClient.getJobID()); + submissionLatch.trigger(); + } + + @Override + public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) { + } + }); + + env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>()); + JobClient jobClient = env.executeAsync(); + + submissionLatch.await(2000L, TimeUnit.MILLISECONDS); + // when executing asynchronously we don't get an "executed" callback + + assertThat(jobClient.getJobID(), is(jobIdReference.get())); + } + + @Test + public void testExecuteCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception { + AtomicReference<Thread> threadReference = new AtomicReference<>(); + + StreamExecutionEnvironment env = new StreamExecutionEnvironment(getClientConfiguration()); + + env.registerJobListener(new JobListener() { + @Override + public void onJobSubmitted(JobClient jobClient, Throwable t) { + threadReference.set(Thread.currentThread()); + } + + @Override + public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) { + } + }); + + env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>()); + env.execute(); + + assertThat(Thread.currentThread(), is(threadReference.get())); + } + + @Test + public void testExecuteAsyncCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception { + AtomicReference<Thread> threadReference = new AtomicReference<>(); + + StreamExecutionEnvironment env = new StreamExecutionEnvironment(getClientConfiguration()); + + env.registerJobListener(new JobListener() { + @Override + public void onJobSubmitted(JobClient jobClient, Throwable t) { + threadReference.set(Thread.currentThread()); + } + + @Override + public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) { + } + }); + + env.fromElements(1, 2, 3, 4, 5).addSink(new DiscardingSink<>()); + env.executeAsync(); + + assertThat(Thread.currentThread(), is(threadReference.get())); } }