Repository: samza Updated Branches: refs/heads/master 7ae261afa -> 9126d373d
SAMZA-974 Support finite data sources that have a notion of end of stream Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9126d373 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9126d373 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9126d373 Branch: refs/heads/master Commit: 9126d373d083787e9489f03c61ae59657705cfdc Parents: 7ae261a Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Fri Sep 30 23:07:30 2016 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Fri Sep 30 23:07:30 2016 -0700 ---------------------------------------------------------------------- build.gradle | 1 + checkstyle/import-control.xml | 1 + .../samza/system/IncomingMessageEnvelope.java | 24 ++- .../system/SystemStreamPartitionIterator.java | 15 +- .../samza/task/EndOfStreamListenerTask.java | 43 ++++ .../clustermanager/ResourceRequestState.java | 12 +- .../org/apache/samza/task/AsyncRunLoop.java | 83 +++++++- .../samza/task/AsyncStreamTaskAdapter.java | 9 +- .../apache/samza/container/TaskInstance.scala | 12 +- .../apache/samza/system/SystemConsumers.scala | 35 +++- .../TestContainerProcessManager.java | 8 +- .../org/apache/samza/task/TestAsyncRunLoop.java | 200 ++++++++++++++++++- .../samza/system/TestSystemConsumers.scala | 6 + 13 files changed, 407 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index ab257d3..2bea27b 100644 --- a/build.gradle +++ b/build.gradle @@ -120,6 +120,7 @@ project(':samza-api') { apply plugin: 'checkstyle' dependencies { + compile "org.slf4j:slf4j-api:$slf4jVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7e77702..db6f859 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -33,6 +33,7 @@ <allow pkg="org.apache.commons" /> <allow class="scala.collection.JavaConversions" /> <allow class="scala.collection.JavaConverters" /> + <allow class="scala.Option" /> <allow pkg="scala.runtime" /> <subpackage name="config"> http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index cc860cf..0ced773 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -19,11 +19,18 @@ package org.apache.samza.system; +import java.nio.charset.Charset; + /** - * This class represents a message envelope that is received by a StreamTask for each message that is received from a + * This class represents a message entvelope that is received by a StreamTask for each message that is received from a * partition of a specific input stream. */ public class IncomingMessageEnvelope { + + //The offset starting with a NUL byte encoded are reserved for end-of-stream. + private static final byte[] END_OF_STREAM_BYTES = "\0END_OF_STREAM".getBytes(); + public static final String END_OF_STREAM_OFFSET = new String(END_OF_STREAM_BYTES, Charset.defaultCharset()); + private final SystemStreamPartition systemStreamPartition; private final String offset; private final Object key; @@ -79,6 +86,21 @@ public class IncomingMessageEnvelope { return size; } + public boolean isEndOfStream() { + return END_OF_STREAM_OFFSET.equals(offset); + } + + /** + * Builds an end-of-stream envelope for an SSP. This is used by a {@link SystemConsumer} implementation to + * indicate that it is at end-of-stream. The end-of-stream message should not delivered to the task implementation. + * + * @param ssp The SSP that is at end-of-stream. + * @return an IncomingMessageEnvelope corresponding to end-of-stream for that SSP. + */ + public static IncomingMessageEnvelope buildEndOfStreamEnvelope(SystemStreamPartition ssp) { + return new IncomingMessageEnvelope(ssp, END_OF_STREAM_OFFSET, null, null); + } + @Override public int hashCode() { final int prime = 31; http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java index a8f858a..d1d61ed 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java @@ -19,6 +19,8 @@ package org.apache.samza.system; +import org.apache.samza.SamzaException; + import java.util.ArrayDeque; import java.util.HashSet; import java.util.Iterator; @@ -28,8 +30,6 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; -import org.apache.samza.SamzaException; - /** * {@link java.util.Iterator} that wraps a * {@link org.apache.samza.system.SystemConsumer} to iterate over the messages @@ -40,6 +40,7 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn private final SystemConsumer systemConsumer; private final Set<SystemStreamPartition> fetchSet; private Queue<IncomingMessageEnvelope> peeks; + private boolean endOfStreamReached = false; public SystemStreamPartitionIterator(SystemConsumer systemConsumer, SystemStreamPartition systemStreamPartition) { this(systemConsumer, systemStreamPartition, 1000); @@ -67,7 +68,13 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn throw new NoSuchElementException(); } - return peeks.poll(); + IncomingMessageEnvelope envelope = peeks.poll(); + + if (envelope.isEndOfStream()) { + endOfStreamReached = true; + } + + return envelope; } @Override @@ -75,7 +82,7 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn } private void refresh() { - if (peeks.size() == 0) { + if (peeks.size() == 0 && !endOfStreamReached) { try { Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = systemConsumer.poll(fetchSet, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES); http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java b/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java new file mode 100644 index 0000000..0f62356 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/EndOfStreamListenerTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * + * The EndOfStreamListenerTask augments {@link StreamTask} allowing the method implementor to specify code to be + * executed when the 'end-of-stream' is reached for all input SSPs. + * + * While some streaming sources are infinite (like kafka), some others like HDFS, File based sources are bounded. For instance, + * file based sources have the notion of EOF to indicate that there is no more data. + * + */ +public interface EndOfStreamListenerTask { + + /** + * Guaranteed to be invoked when all SSPs processed by this task have reached their end-of-stream. Users can choose + * to invoke commit on the {@link TaskCoordinator} to commit changes. + * + * @param collector Contains the means of sending message envelopes to an output stream.* + * @param coordinator Manages execution of tasks. + * + * @throws Exception Any exception types encountered during the execution of the processing task. + */ + void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception; +} http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java index 39897c7..77c192e 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java @@ -133,15 +133,9 @@ public class ResourceRequestState { * requestCount != 0, it will be greater than the total request count for that host. Hence, it should be * assigned to ANY_HOST */ - log.info( - "The number of containers already allocated on {} is greater than what was " + - "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST", - new Object[]{ - hostName, - requestCountOnThisHost, - samzaResource.getResourceID() - } - ); + log.info("The number of containers already allocated on {} is greater than what was " + + "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST", + new Object[]{hostName, requestCountOnThisHost, samzaResource.getResourceID()}); addToAllocatedResourceList(ANY_HOST, samzaResource); } } http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 9a21bf1..77eceea 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -23,6 +23,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,6 +32,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + import org.apache.samza.SamzaException; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; @@ -53,6 +56,7 @@ public class AsyncRunLoop implements Runnable { private final Map<TaskName, AsyncTaskWorker> taskWorkers; private final SystemConsumers consumerMultiplexer; private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping; + private final ExecutorService threadPool; private final CoordinatorRequests coordinatorRequests; private final Object latch; @@ -136,7 +140,6 @@ public class AsyncRunLoop implements Runnable { long startNs = System.nanoTime(); IncomingMessageEnvelope envelope = chooseEnvelope(); - long chooseNs = System.nanoTime(); containerMetrics.chooseNs().update(chooseNs - startNs); @@ -200,6 +203,7 @@ public class AsyncRunLoop implements Runnable { } } + /** * Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes, * it will resume the runloop. @@ -273,6 +277,7 @@ public class AsyncRunLoop implements Runnable { WINDOW, COMMIT, PROCESS, + END_OF_STREAM, NO_OP } @@ -284,11 +289,14 @@ public class AsyncRunLoop implements Runnable { private final TaskInstance<AsyncStreamTask> task; private final TaskCallbackManager callbackManager; private volatile AsyncTaskState state; + private volatile boolean completed = false; + AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) { this.task = task; this.callbackManager = new TaskCallbackManager(this, task.metrics(), callbackTimer, callbackTimeoutMs); - this.state = new AsyncTaskState(task.taskName(), task.metrics()); + Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task); + this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet); } private void init() { @@ -317,6 +325,20 @@ public class AsyncRunLoop implements Runnable { } /** + * Returns those partitions for the task for which we have not received end-of-stream from the consumer. + * @param task + * @return a Set of SSPs such that all SSPs are not at end of stream. + */ + private Set<SystemStreamPartition> getWorkingSSPSet(TaskInstance<AsyncStreamTask> task) { + + Set<SystemStreamPartition> allPartitions = new HashSet<>(JavaConversions.asJavaSet(task.systemStreamPartitions())); + + // filter only those SSPs that are not at end of stream. + Set<SystemStreamPartition> workingSSPSet = allPartitions.stream().filter(ssp -> !consumerMultiplexer.isEndOfStream(ssp)).collect(Collectors.toSet()); + return workingSSPSet; + } + + /** * Invoke next task operation based on its state */ private void run() { @@ -330,12 +352,36 @@ public class AsyncRunLoop implements Runnable { case COMMIT: commit(); break; + case END_OF_STREAM: + endOfStream(); + break; default: //no op break; } } + private void endOfStream() { + state.complete = true; + try { + ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); + + task.endOfStream(coordinator); + // issue a request for shutdown of the task + coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); + coordinatorRequests.update(coordinator); + + // invoke commit on the task - if the endOfStream callback had requested a final commit. + boolean needFinalCommit = coordinatorRequests.commitRequests().remove(task.taskName()); + if (needFinalCommit) { + task.commit(); + } + } finally { + resume(); + } + + } + /** * Process asynchronously. The callback needs to be fired once the processing is done. */ @@ -428,8 +474,6 @@ public class AsyncRunLoop implements Runnable { } } - - /** * Task process completes successfully, update the offsets based on the high-water mark. * Then it will trigger the listener for task state change. @@ -494,24 +538,46 @@ public class AsyncRunLoop implements Runnable { private final class AsyncTaskState { private volatile boolean needWindow = false; private volatile boolean needCommit = false; + private volatile boolean complete = false; + private volatile boolean endOfStream = false; private volatile boolean windowOrCommitInFlight = false; private final AtomicInteger messagesInFlight = new AtomicInteger(0); private final ArrayDeque<PendingEnvelope> pendingEnvelopQueue; + //Set of SSPs that we are currently processing for this task instance + private final Set<SystemStreamPartition> processingSspSet; private final TaskName taskName; private final TaskInstanceMetrics taskMetrics; - AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics) { + AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics, Set<SystemStreamPartition> sspSet) { this.taskName = taskName; this.taskMetrics = taskMetrics; this.pendingEnvelopQueue = new ArrayDeque<>(); + this.processingSspSet = sspSet; + } + + + private boolean checkEndOfStream() { + PendingEnvelope pendingEnvelope = pendingEnvelopQueue.peek(); + + if (pendingEnvelope != null) { + IncomingMessageEnvelope envelope = pendingEnvelope.envelope; + + if (envelope.isEndOfStream()) { + SystemStreamPartition ssp = envelope.getSystemStreamPartition(); + processingSspSet.remove(ssp); + pendingEnvelopQueue.remove(); + } + } + return processingSspSet.isEmpty(); } /** * Returns whether the task is ready to do process/window/commit. */ private boolean isReady() { + endOfStream |= checkEndOfStream(); needCommit |= coordinatorRequests.commitRequests().remove(taskName); - if (needWindow || needCommit) { + if (needWindow || needCommit || endOfStream) { // ready for window or commit only when no messages are in progress and // no window/commit in flight return messagesInFlight.get() == 0 && !windowOrCommitInFlight; @@ -526,9 +592,13 @@ public class AsyncRunLoop implements Runnable { * Returns the next operation by this taskWorker */ private WorkerOp nextOp() { + + if (complete) return WorkerOp.NO_OP; + if (isReady()) { if (needCommit) return WorkerOp.COMMIT; else if (needWindow) return WorkerOp.WINDOW; + else if (endOfStream) return WorkerOp.END_OF_STREAM; else if (!pendingEnvelopQueue.isEmpty()) return WorkerOp.PROCESS; } return WorkerOp.NO_OP; @@ -577,6 +647,7 @@ public class AsyncRunLoop implements Runnable { log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize); } + /** * Fetch the pending envelope in the pending queue for the task to process. * Update the chooser for flow control on the SSP level. Once it's updated, the AsyncRunLoop http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java index 1fc6456..e2fea95 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java @@ -30,7 +30,7 @@ import org.apache.samza.system.IncomingMessageEnvelope; * the callbacks once it's done. If the thread pool is null, it follows the legacy * synchronous model to execute the tasks on the run loop thread. */ -public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask { +public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask { private final StreamTask wrappedTask; private final ExecutorService executor; @@ -89,4 +89,11 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi ((ClosableTask) wrappedTask).close(); } } + + @Override + public void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + if (wrappedTask instanceof EndOfStreamListenerTask) { + ((EndOfStreamListenerTask) wrappedTask).onEndOfStream(collector, coordinator); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 89f6857..b068856 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -31,6 +31,7 @@ import org.apache.samza.system.SystemConsumers import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.ClosableTask +import org.apache.samza.task.EndOfStreamListenerTask import org.apache.samza.task.InitableTask import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.StreamTask @@ -58,6 +59,7 @@ class TaskInstance[T]( val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler) extends Logging { val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] + val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask] val isClosableTask = task.isInstanceOf[ClosableTask] val isAsyncTask = task.isInstanceOf[AsyncStreamTask] @@ -168,6 +170,14 @@ class TaskInstance[T]( } } + def endOfStream(coordinator: ReadableCoordinator): Unit = { + if (isEndOfStreamListenerTask) { + exceptionHandler.maybeHandle { + task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, coordinator); + } + } + } + def window(coordinator: ReadableCoordinator) { if (isWindowableTask) { trace("Windowing for taskName: %s" format taskName) @@ -220,7 +230,7 @@ class TaskInstance[T]( override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) - def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s]" format (taskName, isWindowableTask, isClosableTask) + def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s endofstreamlistener=%s]" format (taskName, isWindowableTask, isClosableTask, isEndOfStreamListenerTask) /** * From the envelope, check if this SSP has catched up with the starting offset of the SSP http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index a8355b9..e2aed5b 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import org.apache.samza.serializers.SerdeManager import org.apache.samza.util.{Logging, TimerUtils} -import org.apache.samza.system.chooser.MessageChooser +import org.apache.samza.system.chooser.{DefaultChooser, MessageChooser} import org.apache.samza.SamzaException -import java.util.HashMap import java.util.ArrayDeque +import java.util.HashSet +import java.util.HashMap import java.util.Queue import java.util.Set -import java.util.HashSet object SystemConsumers { val DEFAULT_POLL_INTERVAL_MS = 50 @@ -130,6 +130,11 @@ class SystemConsumers ( private val unprocessedMessagesBySSP = new HashMap[SystemStreamPartition, Queue[IncomingMessageEnvelope]]() /** + * Set of SSPs that are currently at end-of-stream. + */ + private val endOfStreamSSPs = new HashSet[SystemStreamPartition]() + + /** * A set of SystemStreamPartitions grouped by systemName. This is used as a * cache to figure out which SystemStreamPartitions we need to poll from the * underlying system consumer. @@ -163,7 +168,6 @@ class SystemConsumers ( def start { debug("Starting consumers.") - emptySystemStreamPartitionsBySystem ++= unprocessedMessagesBySSP .keySet .groupBy(_.getSystem) @@ -190,8 +194,16 @@ class SystemConsumers ( chooser.stop } + def register(systemStreamPartition: SystemStreamPartition, offset: String) { debug("Registering stream: %s, %s" format (systemStreamPartition, offset)) + + if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) { + info("Stream : %s is already at end of stream" format (systemStreamPartition)) + endOfStreamSSPs.add(systemStreamPartition) + return; + } + metrics.registerSystemStreamPartition(systemStreamPartition) unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]()) chooser.register(systemStreamPartition, offset) @@ -203,6 +215,11 @@ class SystemConsumers ( } } + + def isEndOfStream(systemStreamPartition: SystemStreamPartition) = { + endOfStreamSSPs.contains(systemStreamPartition) + } + def choose (updateChooser: Boolean = true): IncomingMessageEnvelope = { val envelopeFromChooser = chooser.choose @@ -217,6 +234,11 @@ class SystemConsumers ( } else { val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition + if (envelopeFromChooser.isEndOfStream) { + info("End of stream reached for partition: %s" format systemStreamPartition) + endOfStreamSSPs.add(systemStreamPartition) + } + trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser) // Ok to give the chooser a new message from this stream. @@ -254,7 +276,8 @@ class SystemConsumers ( val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName) // Poll when at least one SSP in this system needs more messages. - if (systemFetchSet.size > 0) { + + if (systemFetchSet != null && systemFetchSet.size > 0) { val consumer = consumers(systemName) trace("Fetching: %s" format systemFetchSet) @@ -262,7 +285,6 @@ class SystemConsumers ( metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchSet.size) val systemStreamPartitionEnvelopes = consumer.poll(systemFetchSet, timeout) - trace("Got incoming message envelopes: %s" format systemStreamPartitionEnvelopes) metrics.systemMessagesPerPoll(systemName).inc @@ -307,7 +329,6 @@ class SystemConsumers ( // Update last poll time so we don't poll too frequently. lastPollNs = clock() - // Poll every system for new messages. consumers.keys.map(poll(_)) } http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 57a5da6..0d61814 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -393,10 +393,10 @@ public class TestContainerProcessManager { @Test public void testAppMasterWithFwk() { ContainerProcessManager taskManager = new ContainerProcessManager( - new MapConfig(config), - state, - new MetricsRegistryMap(), - manager + new MapConfig(config), + state, + new MetricsRegistryMap(), + manager ); taskManager.start(); SamzaResource container2 = new SamzaResource(1, 1024, "", "id0"); http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index ca913de..3263e54 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -19,17 +19,23 @@ package org.apache.samza.task; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.samza.Partition; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; @@ -38,13 +44,19 @@ import org.apache.samza.container.TaskInstanceMetrics; import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.TestSystemConsumers; + import org.junit.Before; import org.junit.Test; +import scala.Option; import scala.collection.JavaConversions; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -73,6 +85,8 @@ public class TestAsyncRunLoop { IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0"); IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1"); IncomingMessageEnvelope envelope3 = new IncomingMessageEnvelope(ssp0, "1", "key0", "value0"); + IncomingMessageEnvelope ssp0EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0); + IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1); TestTask task0; TestTask task1; @@ -90,14 +104,19 @@ public class TestAsyncRunLoop { containerMetrics); } - TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) { + TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) { TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap()); scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConversions.asScalaSet(Collections.singleton(ssp)).toSet(); return new TaskInstance<AsyncStreamTask>(task, taskName, mock(Config.class), taskInstanceMetrics, - null, consumerMultiplexer, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), - offsetManager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>())); + null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), + manager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>())); } + TaskInstance<AsyncStreamTask> createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) { + return createTaskInstance(task, taskName, ssp, offsetManager, consumerMultiplexer); + } + + ExecutorService callbackExecutor; void triggerCallback(final TestTask task, final TaskCallback callback, final boolean success) { callbackExecutor.submit(new Runnable() { @@ -122,7 +141,8 @@ public class TestAsyncRunLoop { void run(TaskCallback callback); } - class TestTask implements AsyncStreamTask, WindowableTask { + + class TestTask implements AsyncStreamTask, WindowableTask, EndOfStreamListenerTask { boolean shutdown = false; boolean commit = false; boolean success; @@ -166,8 +186,14 @@ public class TestAsyncRunLoop { coordinator.shutdown(shutdownRequest); } } + + @Override + public void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) { + coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK); + } } + @Before public void setup() { executor = null; @@ -180,7 +206,7 @@ public class TestAsyncRunLoop { offsetManager = mock(OffsetManager.class); shutdownRequest = TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER; - when(consumerMultiplexer.pollIntervalMs()).thenReturn(1000000); + when(consumerMultiplexer.pollIntervalMs()).thenReturn(10); tasks = new HashMap<>(); task0 = new TestTask(true, true, false); @@ -191,6 +217,7 @@ public class TestAsyncRunLoop { tasks.put(taskName1, t1); } + @Test public void testProcessMultipleTasks() throws Exception { AsyncRunLoop runLoop = createRunLoop(); @@ -207,6 +234,7 @@ public class TestAsyncRunLoop { assertEquals(2L, containerMetrics.processes().getCount()); } + @Test public void testProcessInOrder() throws Exception { AsyncRunLoop runLoop = createRunLoop(); @@ -223,12 +251,10 @@ public class TestAsyncRunLoop { assertEquals(3L, containerMetrics.processes().getCount()); } - @Test - public void testProcessOutOfOrder() throws Exception { - maxMessagesInFlight = 2; + private TestCode buildOutofOrderCallback() { final CountDownLatch latch = new CountDownLatch(1); - task0.code = new TestCode() { + return new TestCode() { @Override public void run(TaskCallback callback) { IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope; @@ -246,6 +272,13 @@ public class TestAsyncRunLoop { } } }; + } + + @Test + public void testProcessOutOfOrder() throws Exception { + maxMessagesInFlight = 2; + + task0.code = buildOutofOrderCallback(); AsyncRunLoop runLoop = createRunLoop(); when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null); @@ -330,4 +363,153 @@ public class TestAsyncRunLoop { assertEquals(2L, containerMetrics.envelopes().getCount()); assertEquals(2L, containerMetrics.processes().getCount()); } + + @Test + public void testEndOfStreamWithMultipleTasks() throws Exception { + task0 = new TestTask(true, true, false); + task1 = new TestTask(true, true, false); + t0 = createTaskInstance(task0, taskName0, ssp0); + t1 = createTaskInstance(task1, taskName1, ssp1); + tasks.put(taskName0, t0); + tasks.put(taskName1, t1); + + AsyncRunLoop runLoop = createRunLoop(); + when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream); + runLoop.run(); + callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + assertEquals(1, task0.processed); + assertEquals(1, task0.completed.get()); + assertEquals(1, task1.processed); + assertEquals(1, task1.completed.get()); + + assertEquals(4L, containerMetrics.envelopes().getCount()); + assertEquals(2L, containerMetrics.processes().getCount()); + } + + @Test + public void testEndOfStreamWithOutOfOrderProcess() throws Exception { + maxMessagesInFlight = 2; + task0 = new TestTask(true, true, false); + task1 = new TestTask(true, true, false); + t0 = createTaskInstance(task0, taskName0, ssp0); + t1 = createTaskInstance(task1, taskName1, ssp1); + tasks.put(taskName0, t0); + tasks.put(taskName1, t1); + + final CountDownLatch latch = new CountDownLatch(1); + task0.code = buildOutofOrderCallback(); + AsyncRunLoop runLoop = createRunLoop(); + when(consumerMultiplexer.choose(false)) + .thenReturn(envelope0) + .thenReturn(envelope3) + .thenReturn(envelope1) + .thenReturn(null) + .thenReturn(ssp0EndOfStream) + .thenReturn(ssp1EndOfStream); + + runLoop.run(); + + callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + assertEquals(2, task0.processed); + assertEquals(2, task0.completed.get()); + assertEquals(1, task1.processed); + assertEquals(1, task1.completed.get()); + assertEquals(5L, containerMetrics.envelopes().getCount()); + assertEquals(3L, containerMetrics.processes().getCount()); + } + + @Test + public void testEndOfStreamCommitBehavior() throws Exception { + //explicitly configure to disable commits inside process or window calls and invoke commit from end of stream + task0 = new TestTask(true, false, false); + task1 = new TestTask(true, false, false); + + t0 = createTaskInstance(task0, taskName0, ssp0); + t1 = createTaskInstance(task1, taskName1, ssp1); + tasks.put(taskName0, t0); + tasks.put(taskName1, t1); + AsyncRunLoop runLoop = createRunLoop(); + + when(consumerMultiplexer.choose(false)).thenReturn(envelope0) + .thenReturn(envelope1) + .thenReturn(null) + .thenReturn(ssp0EndOfStream) + .thenReturn(ssp1EndOfStream); + runLoop.run(); + callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + verify(offsetManager).checkpoint(taskName0); + verify(offsetManager).checkpoint(taskName1); + } + + @Test + public void testEndOfStreamOffsetManagement() throws Exception { + //explicitly configure to disable commits inside process or window calls and invoke commit from end of stream + TestTask mockStreamTask1 = new TestTask(true, false, false); + TestTask mockStreamTask2 = new TestTask(true, false, false); + + Config config = new MapConfig(); + + Partition p1 = new Partition(1); + Partition p2 = new Partition(2); + SystemStreamPartition ssp1 = new SystemStreamPartition("system1", "stream1", p1); + SystemStreamPartition ssp2 = new SystemStreamPartition("system1", "stream2", p2); + IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", "key1", "message1"); + IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", "key1", "message1"); + IncomingMessageEnvelope envelope3 = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2); + + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new HashMap<>(); + List<IncomingMessageEnvelope> messageList = new ArrayList<>(); + messageList.add(envelope1); + messageList.add(envelope2); + messageList.add(envelope3); + sspMap.put(ssp2, messageList); + + + + SystemConsumer mockConsumer = mock(SystemConsumer.class); + when(mockConsumer.poll((Set<SystemStreamPartition>) anyObject(), anyLong())).thenReturn(sspMap); + + HashMap<String, SystemConsumer> systemConsumerMap = new HashMap<>(); + systemConsumerMap.put("system1", mockConsumer); + SystemConsumers consumers = TestSystemConsumers.getSystemConsumers(systemConsumerMap); + + TaskName taskName1 = new TaskName("task1"); + TaskName taskName2 = new TaskName("task2"); + Set<TaskName> taskNames = new HashSet<>(); + taskNames.add(taskName1); + taskNames.add(taskName2); + + OffsetManager offsetManager = mock(OffsetManager.class); + + when(offsetManager.getLastProcessedOffset(taskName1, ssp1)).thenReturn(Option.apply("3")); + when(offsetManager.getLastProcessedOffset(taskName2, ssp2)).thenReturn(Option.apply("0")); + when(offsetManager.getStartingOffset(taskName1, ssp1)).thenReturn(Option.apply(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)); + when(offsetManager.getStartingOffset(taskName2, ssp2)).thenReturn(Option.apply("1")); + + TaskInstance<AsyncStreamTask> taskInstance1 = createTaskInstance(mockStreamTask1, taskName1, ssp1, offsetManager, consumers); + TaskInstance<AsyncStreamTask> taskInstance2 = createTaskInstance(mockStreamTask2, taskName2, ssp2, offsetManager, consumers); + Map<TaskName, TaskInstance<AsyncStreamTask>> tasks = new HashMap<>(); + tasks.put(taskName1, taskInstance1); + tasks.put(taskName2, taskInstance2); + + taskInstance1.registerConsumers(); + taskInstance2.registerConsumers(); + consumers.start(); + + AsyncRunLoop runLoop = new AsyncRunLoop(tasks, + executor, + consumers, + maxMessagesInFlight, + windowMs, + commitMs, + callbackTimeoutMs, + containerMetrics); + + + runLoop.run(); + callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + + + + } } http://git-wip-us.apache.org/repos/asf/samza/blob/9126d373/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index db2249b..b5d58e3 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -327,3 +327,9 @@ class TestSystemConsumers { def register { super.register(systemStreamPartition, "0") } } } + +object TestSystemConsumers { + def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer]) : SystemConsumers = { + new SystemConsumers(new DefaultChooser, consumers.toMap) + } +}