Author: xuefu Date: Tue Jan 20 19:21:31 2015 New Revision: 1653341 URL: http://svn.apache.org/r1653341 Log: HIVE-9179: Add listeners on JobHandle so job status change can be notified to the client [Spark Branch] (Marcelo via Xuefu)
Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java Modified: hive/branches/spark/spark-client/pom.xml hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Modified: hive/branches/spark/spark-client/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/pom.xml?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/pom.xml (original) +++ hive/branches/spark/spark-client/pom.xml Tue Jan 20 19:21:31 2015 @@ -65,6 +65,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Tue Jan 20 19:21:31 2015 @@ -121,6 +121,20 @@ abstract class BaseProtocol extends RpcD } + protected static class JobStarted implements Serializable { + + final String id; + + JobStarted(String id) { + this.id = id; + } + + JobStarted() { + this(null); + } + + } + /** * Inform the client that a new spark job has been submitted for the client job. */ Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Tue Jan 20 19:21:31 2015 @@ -55,4 +55,53 @@ public interface JobHandle<T extends Ser */ SparkCounters getSparkCounters(); + /** + * Return the current state of the job. + */ + State getState(); + + /** + * Add a listener to the job handle. If the job's state is not SENT, a callback for the + * corresponding state will be invoked immediately. + * + * @param l The listener to add. + */ + void addListener(Listener<T> l); + + /** + * The current state of the submitted job. + */ + static enum State { + SENT, + QUEUED, + STARTED, + CANCELLED, + FAILED, + SUCCEEDED; + } + + /** + * A listener for monitoring the state of the job in the remote context. Callbacks are called + * when the corresponding state change occurs. + */ + static interface Listener<T extends Serializable> { + + void onJobQueued(JobHandle<T> job); + + void onJobStarted(JobHandle<T> job); + + void onJobCancelled(JobHandle<T> job); + + void onJobFailed(JobHandle<T> job, Throwable cause); + + void onJobSucceeded(JobHandle<T> job, T result); + + /** + * Called when a monitored Spark job is started on the remote context. This callback + * does not indicate a state change in the client job's status. + */ + void onSparkJobStarted(JobHandle<T> job, int sparkJobId); + + } + } Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Tue Jan 20 19:21:31 2015 @@ -17,15 +17,16 @@ package org.apache.hive.spark.client; -import io.netty.util.concurrent.Promise; - import java.io.Serializable; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import io.netty.util.concurrent.Promise; import org.apache.hive.spark.counter.SparkCounters; @@ -34,28 +35,30 @@ import org.apache.hive.spark.counter.Spa */ class JobHandleImpl<T extends Serializable> implements JobHandle<T> { - private final AtomicBoolean cancelled; private final SparkClientImpl client; private final String jobId; private final MetricsCollection metrics; private final Promise<T> promise; private final List<Integer> sparkJobIds; + private final List<Listener> listeners; + private volatile State state; private volatile SparkCounters sparkCounters; JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) { - this.cancelled = new AtomicBoolean(); this.client = client; this.jobId = jobId; this.promise = promise; + this.listeners = Lists.newLinkedList(); this.metrics = new MetricsCollection(); this.sparkJobIds = new CopyOnWriteArrayList<Integer>(); + this.state = State.SENT; this.sparkCounters = null; } /** Requests a running job to be cancelled. */ @Override public boolean cancel(boolean mayInterrupt) { - if (cancelled.compareAndSet(false, true)) { + if (changeState(State.CANCELLED)) { client.cancel(jobId); promise.cancel(mayInterrupt); return true; @@ -114,20 +117,116 @@ class JobHandleImpl<T extends Serializab return sparkCounters; } + @Override + public State getState() { + return state; + } + + @Override + public void addListener(Listener l) { + synchronized (listeners) { + listeners.add(l); + // If current state is a final state, notify of Spark job IDs before notifying about the + // state transition. + if (state.ordinal() >= State.CANCELLED.ordinal()) { + for (Integer i : sparkJobIds) { + l.onSparkJobStarted(this, i); + } + } + + fireStateChange(state, l); + + // Otherwise, notify about Spark jobs after the state notification. + if (state.ordinal() < State.CANCELLED.ordinal()) { + for (Integer i : sparkJobIds) { + l.onSparkJobStarted(this, i); + } + } + } + } + public void setSparkCounters(SparkCounters sparkCounters) { this.sparkCounters = sparkCounters; } @SuppressWarnings("unchecked") void setSuccess(Object result) { - promise.setSuccess((T) result); + // The synchronization here is not necessary, but tests depend on it. + synchronized (listeners) { + promise.setSuccess((T) result); + changeState(State.SUCCEEDED); + } } void setFailure(Throwable error) { - promise.setFailure(error); + // The synchronization here is not necessary, but tests depend on it. + synchronized (listeners) { + promise.setFailure(error); + changeState(State.FAILED); + } + } + + /** + * Changes the state of this job handle, making sure that illegal state transitions are ignored. + * Fires events appropriately. + * + * As a rule, state transitions can only occur if the current state is "higher" than the current + * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are + * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher + * than the CANCELLED enum constant. + */ + boolean changeState(State newState) { + synchronized (listeners) { + if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) { + state = newState; + for (Listener l : listeners) { + fireStateChange(newState, l); + } + return true; + } + return false; + } + } + + void addSparkJobId(int sparkJobId) { + synchronized (listeners) { + sparkJobIds.add(sparkJobId); + for (Listener l : listeners) { + l.onSparkJobStarted(this, sparkJobId); + } + } + } + + private void fireStateChange(State s, Listener l) { + switch (s) { + case SENT: + break; + case QUEUED: + l.onJobQueued(this); + break; + case STARTED: + l.onJobStarted(this); + break; + case CANCELLED: + l.onJobCancelled(this); + break; + case FAILED: + l.onJobFailed(this, promise.cause()); + break; + case SUCCEEDED: + try { + l.onJobSucceeded(this, promise.get()); + } catch (Exception e) { + // Shouldn't really happen. + throw new IllegalStateException(e); + } + break; + default: + throw new IllegalStateException(); + } } - /** Last attempt resort at preventing stray jobs from accumulating in SparkClientImpl. */ + /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */ @Override protected void finalize() { if (!isDone()) { Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Tue Jan 20 19:21:31 2015 @@ -245,6 +245,10 @@ public class RemoteDriver { clientRpc.call(new JobResult(jobId, result, error, counters)); } + void jobStarted(String jobId) { + clientRpc.call(new JobStarted(jobId)); + } + void jobSubmitted(String jobId, int sparkJobId) { LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId); clientRpc.call(new JobSubmitted(jobId, sparkJobId)); @@ -325,6 +329,8 @@ public class RemoteDriver { @Override public Void call() throws Exception { + protocol.jobStarted(req.id); + try { jc.setMonitorCb(new MonitorCallback() { @Override Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original) +++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Jan 20 19:21:31 2015 @@ -382,7 +382,7 @@ class SparkClientImpl implements SparkCl <T extends Serializable> JobHandleImpl<T> submit(Job<T> job) { final String jobId = UUID.randomUUID().toString(); final Promise<T> promise = driverRpc.createPromise(); - JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId); + final JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId); jobs.put(jobId, handle); final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job)); @@ -393,7 +393,9 @@ class SparkClientImpl implements SparkCl rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() { @Override public void operationComplete(io.netty.util.concurrent.Future<Void> f) { - if (!f.isSuccess() && !promise.isDone()) { + if (f.isSuccess()) { + handle.changeState(JobHandle.State.QUEUED); + } else if (!promise.isDone()) { promise.setFailure(f.cause()); } } @@ -456,11 +458,20 @@ class SparkClientImpl implements SparkCl } } + private void handle(ChannelHandlerContext ctx, JobStarted msg) { + JobHandleImpl<?> handle = jobs.get(msg.id); + if (handle != null) { + handle.changeState(JobHandle.State.STARTED); + } else { + LOG.warn("Received event for unknown job {}", msg.id); + } + } + private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { JobHandleImpl<?> handle = jobs.get(msg.clientJobId); if (handle != null) { LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId); - handle.getSparkJobIds().add(msg.sparkJobId); + handle.addSparkJobId(msg.sparkJobId); } else { LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId); } Added: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java?rev=1653341&view=auto ============================================================================== --- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java (added) +++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestJobHandle.java Tue Jan 20 19:21:31 2015 @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import java.io.Serializable; + +import io.netty.util.concurrent.Promise; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class TestJobHandle { + + @Mock private SparkClientImpl client; + @Mock private Promise<Serializable> promise; + @Mock private JobHandle.Listener<Serializable> listener; + @Mock private JobHandle.Listener<Serializable> listener2; + + @Test + public void testStateChanges() throws Exception { + JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job"); + handle.addListener(listener); + + assertTrue(handle.changeState(JobHandle.State.QUEUED)); + verify(listener).onJobQueued(handle); + + assertTrue(handle.changeState(JobHandle.State.STARTED)); + verify(listener).onJobStarted(handle); + + handle.addSparkJobId(1); + verify(listener).onSparkJobStarted(same(handle), eq(1)); + + assertTrue(handle.changeState(JobHandle.State.CANCELLED)); + verify(listener).onJobCancelled(handle); + + assertFalse(handle.changeState(JobHandle.State.STARTED)); + assertFalse(handle.changeState(JobHandle.State.FAILED)); + assertFalse(handle.changeState(JobHandle.State.SUCCEEDED)); + } + + @Test + public void testFailedJob() throws Exception { + JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job"); + handle.addListener(listener); + + Throwable cause = new Exception(); + when(promise.cause()).thenReturn(cause); + + assertTrue(handle.changeState(JobHandle.State.FAILED)); + verify(promise).cause(); + verify(listener).onJobFailed(handle, cause); + } + + @Test + public void testSucceededJob() throws Exception { + JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job"); + handle.addListener(listener); + + Serializable result = new Exception(); + when(promise.get()).thenReturn(result); + + assertTrue(handle.changeState(JobHandle.State.SUCCEEDED)); + verify(promise).get(); + verify(listener).onJobSucceeded(handle, result); + } + + @Test + public void testImmediateCallback() throws Exception { + JobHandleImpl<Serializable> handle = new JobHandleImpl<Serializable>(client, promise, "job"); + assertTrue(handle.changeState(JobHandle.State.QUEUED)); + handle.addListener(listener); + verify(listener).onJobQueued(handle); + + handle.changeState(JobHandle.State.STARTED); + handle.addSparkJobId(1); + handle.changeState(JobHandle.State.CANCELLED); + + handle.addListener(listener2); + InOrder inOrder = inOrder(listener2); + inOrder.verify(listener2).onSparkJobStarted(same(handle), eq(1)); + inOrder.verify(listener2).onJobCancelled(same(handle)); + } + +} Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1653341&r1=1653340&r2=1653341&view=diff ============================================================================== --- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original) +++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Tue Jan 20 19:21:31 2015 @@ -17,15 +17,11 @@ package org.apache.hive.spark.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; +import java.io.Serializable; import java.net.URL; import java.util.Arrays; import java.util.HashMap; @@ -36,17 +32,19 @@ import java.util.concurrent.TimeUnit; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; +import com.google.common.base.Objects; +import com.google.common.base.Strings; +import com.google.common.io.ByteStreams; import org.apache.hive.spark.counter.SparkCounters; +import org.apache.spark.SparkException; import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.junit.Test; - -import com.google.common.base.Objects; -import com.google.common.base.Strings; -import com.google.common.io.ByteStreams; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public class TestSparkClient { @@ -79,8 +77,19 @@ public class TestSparkClient { runTest(true, new TestFunction() { @Override public void call(SparkClient client) throws Exception { + JobHandle.Listener<String> listener = newListener(); JobHandle<String> handle = client.submit(new SimpleJob()); + handle.addListener(listener); assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS)); + + // Try an invalid state transition on the handle. This ensures that the actual state + // change we're interested in actually happened, since internally the handle serializes + // state changes. + assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT)); + + verify(listener).onJobQueued(handle); + verify(listener).onJobStarted(handle); + verify(listener).onJobSucceeded(same(handle), eq(handle.get())); } }); } @@ -101,12 +110,25 @@ public class TestSparkClient { runTest(true, new TestFunction() { @Override public void call(SparkClient client) throws Exception { - JobHandle<String> handle = client.submit(new SimpleJob()); + JobHandle.Listener<String> listener = newListener(); + JobHandle<String> handle = client.submit(new ErrorJob()); + handle.addListener(listener); try { handle.get(TIMEOUT, TimeUnit.SECONDS); + fail("Should have thrown an exception."); } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof IllegalStateException); + assertTrue(ee.getCause() instanceof SparkException); + assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello")); } + + // Try an invalid state transition on the handle. This ensures that the actual state + // change we're interested in actually happened, since internally the handle serializes + // state changes. + assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT)); + + verify(listener).onJobQueued(handle); + verify(listener).onJobStarted(handle); + verify(listener).onJobFailed(same(handle), any(Throwable.class)); } }); } @@ -138,18 +160,26 @@ public class TestSparkClient { runTest(true, new TestFunction() { @Override public void call(SparkClient client) throws Exception { + JobHandle.Listener<Integer> listener = newListener(); JobHandle<Integer> future = client.submit(new AsyncSparkJob()); + future.addListener(listener); future.get(TIMEOUT, TimeUnit.SECONDS); MetricsCollection metrics = future.getMetrics(); assertEquals(1, metrics.getJobIds().size()); assertTrue(metrics.getAllMetrics().executorRunTime > 0L); + verify(listener).onSparkJobStarted(same(future), + eq(metrics.getJobIds().iterator().next())); + JobHandle.Listener<Integer> listener2 = newListener(); JobHandle<Integer> future2 = client.submit(new AsyncSparkJob()); + future2.addListener(listener2); future2.get(TIMEOUT, TimeUnit.SECONDS); MetricsCollection metrics2 = future2.getMetrics(); assertEquals(1, metrics2.getJobIds().size()); assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds())); assertTrue(metrics2.getAllMetrics().executorRunTime > 0L); + verify(listener2).onSparkJobStarted(same(future2), + eq(metrics2.getJobIds().iterator().next())); } }); } @@ -226,6 +256,13 @@ public class TestSparkClient { }); } + private <T extends Serializable> JobHandle.Listener<T> newListener() { + @SuppressWarnings("unchecked") + JobHandle.Listener<T> listener = + (JobHandle.Listener<T>) mock(JobHandle.Listener.class); + return listener; + } + private void runTest(boolean local, TestFunction test) throws Exception { Map<String, String> conf = createConf(local); SparkClientFactory.initialize(conf); @@ -250,6 +287,15 @@ public class TestSparkClient { } } + + private static class ErrorJob implements Job<String> { + + @Override + public String call(JobContext jc) { + throw new IllegalStateException("Hello"); + } + + } private static class SparkJob implements Job<Long> {