SAMZA-1080 : Initial Standalone JobCoordinator and StreamProcessor API This patch contains changes associated with the Standalone StreamProcessor, where there is no coordination. This will work for load-balanced consumers like new Kafka consumer and statically partitioned cases.
Additionally, we have introduced TaskFactory for StreamTask and AsyncStreamTask. Author: navina <nav...@apache.org> Reviewers: xinyuiscool,fredji97 Closes #44 from navina/Noop-JC Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a3a5ac7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a3a5ac7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a3a5ac7 Branch: refs/heads/master Commit: 2a3a5ac7f21dc4213ff6ec96e11a798bff096d04 Parents: 154cda2 Author: navina <nav...@apache.org> Authored: Mon Jan 30 18:30:02 2017 -0800 Committer: navina <nav...@apache.org> Committed: Mon Jan 30 18:30:02 2017 -0800 ---------------------------------------------------------------------- NOTICE | 1 + .../samza/task/AsyncStreamTaskFactory.java | 28 ++ .../apache/samza/task/StreamTaskFactory.java | 31 ++ .../samza/config/JobCoordinatorConfig.java | 40 ++ .../org/apache/samza/config/TaskConfigJava.java | 25 ++ .../apache/samza/container/RunLoopFactory.java | 20 +- .../AllSspToSingleTaskGrouperFactory.java | 69 ++++ .../task/SingleContainerGrouperFactory.java | 56 +++ .../samza/coordinator/JobCoordinator.java | 72 ++++ .../coordinator/JobCoordinatorFactory.java | 35 ++ .../processor/SamzaContainerController.java | 152 ++++++++ .../apache/samza/processor/StreamProcessor.java | 173 +++++++++ .../standalone/StandaloneJobCoordinator.java | 148 ++++++++ .../StandaloneJobCoordinatorFactory.java | 31 ++ .../org/apache/samza/task/AsyncRunLoop.java | 37 +- .../org/apache/samza/config/JobConfig.scala | 3 + .../org/apache/samza/container/RunLoop.scala | 18 +- .../apache/samza/container/SamzaContainer.scala | 178 ++++++--- .../apache/samza/container/TaskInstance.scala | 21 +- .../samza/coordinator/JobCoordinator.scala | 360 ------------------ .../samza/coordinator/JobModelManager.scala | 362 +++++++++++++++++++ .../samza/job/local/ThreadJobFactory.scala | 23 +- .../org/apache/samza/task/TestAsyncRunLoop.java | 20 +- .../apache/samza/container/TestRunLoop.scala | 15 +- .../samza/container/TestSamzaContainer.scala | 6 +- .../samza/container/TestTaskInstance.scala | 6 +- .../test/StandaloneIntegrationTestHarness.java | 84 +++++ .../apache/samza/test/StandaloneTestUtils.java | 111 ++++++ .../test/processor/IdentityStreamTask.java | 72 ++++ .../test/processor/TestStreamProcessor.java | 226 ++++++++++++ .../AbstractIntegrationTestHarness.scala | 60 +++ .../AbstractKafkaServerTestHarness.scala | 123 +++++++ .../harness/AbstractZookeeperTestHarness.scala | 72 ++++ 33 files changed, 2185 insertions(+), 493 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 3352dda..2ee5fdc 100644 --- a/NOTICE +++ b/NOTICE @@ -3,3 +3,4 @@ Copyright 2014 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java new file mode 100644 index 0000000..e5ce9c4 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * Build {@link AsyncStreamTask} instances. + * Implementations should return a new instance for each {@link #createInstance()} invocation. + */ +public interface AsyncStreamTaskFactory { + AsyncStreamTask createInstance(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java new file mode 100644 index 0000000..52adef6 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.apache.samza.annotation.InterfaceStability; + +/** + * Build {@link StreamTask} instances. + * Implementations should return a new instance for each {@link #createInstance()} invocation. + */ +@InterfaceStability.Stable +public interface StreamTaskFactory { + StreamTask createInstance(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java new file mode 100644 index 0000000..946a308 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import com.google.common.base.Strings; + +public class JobCoordinatorConfig extends MapConfig { + public static final String JOB_COORDINATOR_FACTORY = "job-coordinator.factory"; + + public JobCoordinatorConfig(Config config) { + super(config); + } + + public String getJobCoordinatorFactoryClassName() { + String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY); + if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) { + throw new ConfigException( + String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY)); + } + + return jobCoordinatorFactoryClassName; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index 648fe58..e6db74a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -35,6 +35,10 @@ import scala.collection.JavaConversions; public class TaskConfigJava extends MapConfig { + // Task Configs + private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms"; + public static final long DEFAULT_TASK_SHUTDOWN_MS = 5000L; + // broadcast streams consumed by all tasks. e.g. kafka.foo#1 public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs"; private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$"; @@ -117,4 +121,25 @@ public class TaskConfigJava extends MapConfig { return Collections.unmodifiableSet(allInputSS); } + + /** + * Returns a value indicating how long to wait for the tasks to shutdown + * If the value is not defined in the config or if does not parse correctly, we return the default value - + * {@value #DEFAULT_TASK_SHUTDOWN_MS} + * + * @return Long value indicating how long to wait for all the tasks to shutdown + */ + public long getShutdownMs() { + String shutdownMs = get(TASK_SHUTDOWN_MS); + try { + return Long.parseLong(shutdownMs); + } catch (NumberFormatException nfe) { + LOGGER.warn(String.format( + "Unable to parse user-configure value for %s - %s. Using default value %d", + TASK_SHUTDOWN_MS, + shutdownMs, + DEFAULT_TASK_SHUTDOWN_MS)); + return DEFAULT_TASK_SHUTDOWN_MS; + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index 23a68cb..32ab47a 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -19,20 +19,19 @@ package org.apache.samza.container; -import java.util.concurrent.ExecutorService; import org.apache.samza.SamzaException; import org.apache.samza.config.TaskConfig; -import org.apache.samza.util.HighResolutionClock; import org.apache.samza.system.SystemConsumers; import org.apache.samza.task.AsyncRunLoop; -import org.apache.samza.task.AsyncStreamTask; -import org.apache.samza.task.StreamTask; +import org.apache.samza.util.HighResolutionClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; +import java.util.concurrent.ExecutorService; + import static org.apache.samza.util.Util.asScalaClock; /** @@ -46,7 +45,7 @@ public class RunLoopFactory { private static final long DEFAULT_COMMIT_MS = 60000L; private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L; - public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> taskInstances, + public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance> taskInstances, SystemConsumers consumerMultiplexer, ExecutorService threadPool, long maxThrottlingDelayMs, @@ -62,9 +61,9 @@ public class RunLoopFactory { log.info("Got commit milliseconds: " + taskCommitMs); - int asyncTaskCount = taskInstances.values().count(new AbstractFunction1<TaskInstance<?>, Object>() { + int asyncTaskCount = taskInstances.values().count(new AbstractFunction1<TaskInstance, Object>() { @Override - public Boolean apply(TaskInstance<?> t) { + public Boolean apply(TaskInstance t) { return t.isAsyncTask(); } }); @@ -77,9 +76,8 @@ public class RunLoopFactory { if (asyncTaskCount == 0) { log.info("Run loop in single thread mode."); - scala.collection.immutable.Map<TaskName, TaskInstance<StreamTask>> streamTaskInstances = (scala.collection.immutable.Map) taskInstances; return new RunLoop( - streamTaskInstances, + taskInstances, consumerMultiplexer, containerMetrics, maxThrottlingDelayMs, @@ -95,12 +93,10 @@ public class RunLoopFactory { log.info("Got callback timeout: " + callbackTimeout); - scala.collection.immutable.Map<TaskName, TaskInstance<AsyncStreamTask>> asyncStreamTaskInstances = (scala.collection.immutable.Map) taskInstances; - log.info("Run loop in asynchronous mode."); return new AsyncRunLoop( - JavaConversions.mapAsJavaMap(asyncStreamTaskInstances), + JavaConversions.mapAsJavaMap(taskInstances), threadPool, consumerMultiplexer, taskMaxConcurrency, http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java new file mode 100644 index 0000000..2d22977 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.stream; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * AllSspToSingleTaskGrouper, as the name suggests, assigns all partitions to be consumed by a single TaskInstance + * This is useful, in case of using load-balanced consumers like the new Kafka consumer, Samza doesn't control the + * partitions being consumed by a task. Hence, it is assumed that there is only 1 task that processes all messages, + * irrespective of which partition it belongs to. + * This also implies that container and tasks are synonymous when this grouper is used. Taskname(s) has to be globally + * unique within a given job. + * + * Note: This grouper does not take in broadcast streams yet. + */ +class AllSspToSingleTaskGrouper implements SystemStreamPartitionGrouper { + private final int containerId; + + public AllSspToSingleTaskGrouper(int containerId) { + this.containerId = containerId; + } + + @Override + public Map<TaskName, Set<SystemStreamPartition>> group(final Set<SystemStreamPartition> ssps) { + if (ssps == null) { + throw new SamzaException("ssp set cannot be null!"); + } + if (ssps.size() == 0) { + throw new SamzaException("Cannot process stream task with no input system stream partitions"); + } + + final TaskName taskName = new TaskName(String.format("Task-%s", String.valueOf(containerId))); + + return Collections.singletonMap(taskName, ssps); + } +} + +public class AllSspToSingleTaskGrouperFactory implements SystemStreamPartitionGrouperFactory { + @Override + public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) { + return new AllSspToSingleTaskGrouper(config.getInt(JobConfig.PROCESSOR_ID())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java new file mode 100644 index 0000000..980f2a9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container.grouper.task; + +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class SingleContainerGrouperFactory implements TaskNameGrouperFactory { + @Override + public TaskNameGrouper build(Config config) { + return new SingleContainerGrouper(config.getInt(JobConfig.PROCESSOR_ID())); + } +} + +class SingleContainerGrouper implements TaskNameGrouper { + private final int containerId; + + SingleContainerGrouper(int containerId) { + this.containerId = containerId; + } + + @Override + public Set<ContainerModel> group(Set<TaskModel> taskModels) { + Map<TaskName, TaskModel> taskNameTaskModelMap = new HashMap<>(); + for (TaskModel taskModel: taskModels) { + taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel); + } + ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap); + return Collections.singleton(containerModel); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java new file mode 100644 index 0000000..252e56b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.job.model.JobModel; + +/** + * A JobCoordinator is a pluggable module in each process that provides the JobModel and the ID to the StreamProcessor. + * In some cases, ID assignment is completely config driven, while in other cases, ID assignment may require + * coordination with JobCoordinators of other StreamProcessors. + * */ +@InterfaceStability.Evolving +public interface JobCoordinator { + /** + * Starts the JobCoordinator which involves one or more of the following: + * * LeaderElector Module initialization, if any + * * If leader, generate JobModel. Else, read JobModel + */ + void start(); + + /** + * Cleanly shutting down the JobCoordinator involves: + * * Shutting down the Container + * * Shutting down the LeaderElection module (TBD: details depending on leader or not) + */ + void stop(); + + /** + * Waits for a specified amount of time for the JobCoordinator to fully start-up, which means it should be ready to + * process messages. + * In a Standalone use-case, it may be sufficient to wait for the container to start-up. + * In a ZK based Standalone use-case, it also includes registration with ZK, initialization of the + * leader elector module, container start-up etc. + * + * @param timeoutMs Maximum time to wait, in milliseconds + * @return {@code true}, if the JobCoordinator is started within the specified wait time and {@code false} if the + * waiting time elapsed + * @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up + */ + boolean awaitStart(long timeoutMs) throws InterruptedException; + /** + * Returns the logical ID assigned to the processor + * It is up to the user to ensure that different instances of StreamProcessor within a job have unique processor ID. + * @return integer representing the logical processor ID + */ + int getProcessorId(); + + /** + * Returns the current JobModel + * The implementation of the JobCoordinator in the leader needs to know how to read the config and generate JobModel + * In case of a non-leader, the JobCoordinator should simply fetch the jobmodel + * @return instance of JobModel that describes the partition distribution among the processors (and hence, tasks) + */ + JobModel getJobModel(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java new file mode 100644 index 0000000..3da70e0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.processor.SamzaContainerController; + +@InterfaceStability.Evolving +public interface JobCoordinatorFactory { + /** + * @param processorId Unique identifier for the processor + * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" + * @param containerController Controller interface for starting and stopping container. In future, it may simply + * pause the container and add/remove tasks + * @return An instance of IJobCoordinator + */ + JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java new file mode 100644 index 0000000..d448d30 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.processor; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.SamzaContainer$; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.metrics.JmxServer; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SamzaContainerController { + private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class); + + private final ExecutorService executorService; + private volatile SamzaContainer container; + private final Map<String, MetricsReporter> metricsReporterMap; + private final Object taskFactory; + private final long containerShutdownMs; + + // Internal Member Variables + private Future containerFuture; + + /** + * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer} + * Requests to execute a container are submitted to the {@link ExecutorService} + * + * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or + * {@link org.apache.samza.task.AsyncStreamTask} + * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances + * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance + */ + public SamzaContainerController( + Object taskFactory, + long containerShutdownMs, + String processorId, + Map<String, MetricsReporter> metricsReporterMap) { + this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("p" + processorId + "-container-thread-%d").build()); + this.taskFactory = taskFactory; + this.metricsReporterMap = metricsReporterMap; + if (containerShutdownMs == -1) { + this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS; + } else { + this.containerShutdownMs = containerShutdownMs; + } + } + + /** + * Instantiates a container and submits to the executor. This method does not actually wait for the container to + * fully start-up. For such a behavior, see {@link #awaitStart(long)} + * <p> + * <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to + * ensure that the container has been stopped with stopContainer before invoking this method.</i> + * + * @param containerModel {@link ContainerModel} instance to use for the current run of the Container + * @param config Complete configuration map used by the Samza job + * @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams + * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments + */ + public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) { + LocalityManager localityManager = null; + if (new ClusterManagerConfig(config).getHostAffinityEnabled()) { + localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config); + } + log.info("About to create container: " + containerModel.getContainerId()); + container = SamzaContainer$.MODULE$.apply( + containerModel.getContainerId(), + containerModel, + config, + maxChangelogStreamPartitions, + localityManager, + new JmxServer(), + Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap), + taskFactory); + log.info("About to start container: " + containerModel.getContainerId()); + containerFuture = executorService.submit(() -> container.run()); + } + + /** + * Method waits for a specified amount of time for the container to fully start-up, which consists of class-loading + * all the components and start message processing + * + * @param timeoutMs Maximum time to wait, in milliseconds + * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting + * time elapsed + * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up + */ + public boolean awaitStart(long timeoutMs) throws InterruptedException { + return container.awaitStart(timeoutMs); + } + + /** + * Stops a running container, if any. Invoking this method multiple times does not have any side-effects. + */ + public void stopContainer() { + if (container == null) { + log.warn("Shutdown before a container was created."); + return; + } + + container.shutdown(); + try { + if (containerFuture != null) + containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + log.error("Ran into problems while trying to stop the container in the processor!", e); + } catch (TimeoutException e) { + log.warn("Got Timeout Exception while trying to stop the container in the processor! The processor may not shutdown properly", e); + } + } + + /** + * Shutsdown the controller by first stop any running container and then, shutting down the {@link ExecutorService} + */ + public void shutdown() { + stopContainer(); + executorService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java new file mode 100644 index 0000000..5e90c56 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.processor; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.metrics.MetricsReporter; +import org.apache.samza.task.AsyncStreamTaskFactory; +import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an + * independent process. + * <p> + * <b>Usage Example:</b> + * <pre> + * StreamProcessor processor = new StreamProcessor(1, config); + * processor.start(); + * try { + * boolean status = processor.awaitStart(TIMEOUT_MS); // Optional - blocking call + * if (!status) { + * // Timed out + * } + * ... + * } catch (InterruptedException ie) { + * ... + * } finally { + * processor.stop(); + * } + * </pre> + * Note: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in + * multiple threads. + */ +@InterfaceStability.Evolving +public class StreamProcessor { + private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class); + /** + * processor.id is equivalent to containerId in samza. It is a logical identifier used by Samza for a processor. + * In a distributed environment, this logical identifier is mapped to a physical identifier of the resource. For + * example, Yarn provides a "containerId" for every resource it allocates. + * In an embedded environment, this identifier is provided by the user by directly using the StreamProcessor API. + * <p> + * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors. + */ + private static final String PROCESSOR_ID = "processor.id"; + private final int processorId; + private final JobCoordinator jobCoordinator; + + /** + * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container + * <p> + * JobCoordinator controls how the various StreamProcessor instances belonging to a job coordinate. It is also + * responsible generating and updating JobModel. + * When StreamProcessor starts, it starts the JobCoordinator and brings up a SamzaContainer based on the JobModel. + * SamzaContainer is executed using an ExecutorService. + * <p> + * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user + * + * @param processorId Unique identifier for a processor within the job. It has the same semantics as + * "containerId" in Samza + * @param config Instance of config object - contains all configuration required for processing + * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job + * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances. + */ + public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, + AsyncStreamTaskFactory asyncStreamTaskFactory) { + this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory); + } + + /** + * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created + * using the provided {@link StreamTaskFactory}. + */ + public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, + StreamTaskFactory streamTaskFactory) { + this(processorId, config, customMetricsReporters, (Object) streamTaskFactory); + } + + /** + * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created + * using the "task.class" configuration instead of a task factory. + */ + public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) { + this(processorId, config, customMetricsReporters, (Object) null); + } + + private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, + Object taskFactory) { + this.processorId = processorId; + + Map<String, String> updatedConfigMap = new HashMap<>(); + updatedConfigMap.putAll(config); + updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId)); + Config updatedConfig = new MapConfig(updatedConfigMap); + + + SamzaContainerController containerController = new SamzaContainerController( + taskFactory, + new TaskConfigJava(updatedConfig).getShutdownMs(), + String.valueOf(processorId), + customMetricsReporters); + + this.jobCoordinator = Util. + <JobCoordinatorFactory>getObj( + new JobCoordinatorConfig(updatedConfig) + .getJobCoordinatorFactoryClassName()) + .getJobCoordinator(processorId, updatedConfig, containerController); + } + + /** + * StreamProcessor Lifecycle: start() + * <ul> + * <li>Starts the JobCoordinator and fetches the JobModel</li> + * <li>jobCoordinator.start returns after starting the container using ContainerModel </li> + * </ul> + * When start() returns, it only guarantees that the container is initialized and submitted by the controller to + * execute + */ + public void start() { + jobCoordinator.start(); + } + + /** + * Method that allows the user to wait for a specified amount of time for the container to initialize and start + * processing messages + * + * @param timeoutMs Maximum time to wait, in milliseconds + * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting time + * elapsed + * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up + */ + public boolean awaitStart(long timeoutMs) throws InterruptedException { + return jobCoordinator.awaitStart(timeoutMs); + } + + /** + * StreamProcessor Lifecycle: stop() + * <ul> + * <li>Stops the SamzaContainer execution</li> + * <li>Stops the JobCoordinator</li> + * </ul> + */ + public void stop() { + jobCoordinator.stop(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java new file mode 100644 index 0000000..1401725 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.standalone; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.JobModelManager$; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.processor.SamzaContainerController; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.util.SystemClock; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Standalone Job Coordinator does not implement any leader elector module or cluster manager + * + * It generates the JobModel using the Config passed into the constructor. + * + * Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition + * distribution mechanism: + * <ul> + * <li> + * Consumer-managed Partition Distribution - For example, using the kafka consumer which also handles partition + * load balancing across its consumers. In such a case, all input SystemStreamPartition(s) can be grouped to the same + * task instance using {@link org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory} and the + * task can be added to a single container using + * {@link org.apache.samza.container.grouper.task.SingleContainerGrouperFactory}. + * </li> + * <li> + * User-defined Fixed Partition Distribution - For example, the application may always run a fixed number of + * processors and use a static distribution of partitions that doesn't change. This can be achieved by adding custom + * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} and + * {@link org.apache.samza.container.grouper.task.TaskNameGrouper}. + * </li> + * </ul> + * */ +public class StandaloneJobCoordinator implements JobCoordinator { + private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class); + private final int processorId; + private final Config config; + private final JobModelManager jobModelManager; + private final SamzaContainerController containerController; + + @VisibleForTesting + StandaloneJobCoordinator( + int processorId, + Config config, + SamzaContainerController containerController, + JobModelManager jobModelManager) { + this.processorId = processorId; + this.config = config; + this.containerController = containerController; + this.jobModelManager = jobModelManager; + } + + public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { + this.processorId = processorId; + this.config = config; + this.containerController = containerController; + + JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); + Map<String, SystemAdmin> systemAdmins = new HashMap<>(); + for (String systemName: systemConfig.getSystemNames()) { + String systemFactoryClassName = systemConfig.getSystemFactory(systemName); + if (systemFactoryClassName == null) { + log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); + } + SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName); + systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); + } + + StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + + /** TODO: + * Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also, + * in SamzaContainer for writing locality info to the coordinator stream. This closely couples together + * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator + * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) + */ + this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + } + + @Override + public void start() { + // No-op + JobModel jobModel = getJobModel(); + containerController.startContainer( + jobModel.getContainers().get(processorId), + jobModel.getConfig(), + jobModel.maxChangeLogStreamPartitions); + } + + @Override + public void stop() { + // No-op + containerController.shutdown(); + } + + /** + * Waits for a specified amount of time for the JobCoordinator to fully start-up, which means it should be ready to + * process messages. In a Standalone use-case, it may be sufficient to wait for the container to start-up. In case of + * ZK based Standalone use-case, it also includes registration with ZK, the initialization of leader elector module etc. + * + * @param timeoutMs Maximum time to wait, in milliseconds + */ + @Override + public boolean awaitStart(long timeoutMs) throws InterruptedException { + return containerController.awaitStart(timeoutMs); + } + + @Override + public int getProcessorId() { + return this.processorId; + } + + @Override + public JobModel getJobModel() { + return jobModelManager.jobModel(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java new file mode 100644 index 0000000..7ca85c0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.standalone; + +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobCoordinatorFactory; +import org.apache.samza.processor.SamzaContainerController; + +public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory { + @Override + public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { + return new StandaloneJobCoordinator(processorId, config, containerController); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 5c5ea65..064402c 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -75,7 +75,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private volatile Throwable throwable = null; private final HighResolutionClock clock; - public AsyncRunLoop(Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, + public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances, ExecutorService threadPool, SystemConsumers consumerMultiplexer, int maxConcurrency, @@ -100,7 +100,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { this.workerTimer = Executors.newSingleThreadScheduledExecutor(); this.clock = clock; Map<TaskName, AsyncTaskWorker> workers = new HashMap<>(); - for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) { + for (TaskInstance task : taskInstances.values()) { workers.put(task.taskName(), new AsyncTaskWorker(task)); } // Partions and tasks assigned to the container will not change during the run loop life time @@ -112,14 +112,12 @@ public class AsyncRunLoop implements Runnable, Throttleable { * Returns mapping of the SystemStreamPartition to the AsyncTaskWorkers to efficiently route the envelopes */ private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap( - Map<TaskName, TaskInstance<AsyncStreamTask>> taskInstances, Map<TaskName, AsyncTaskWorker> taskWorkers) { + Map<TaskName, TaskInstance> taskInstances, Map<TaskName, AsyncTaskWorker> taskWorkers) { Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new HashMap<>(); - for (TaskInstance<AsyncStreamTask> task : taskInstances.values()) { + for (TaskInstance task : taskInstances.values()) { Set<SystemStreamPartition> ssps = JavaConversions.setAsJavaSet(task.systemStreamPartitions()); for (SystemStreamPartition ssp : ssps) { - if (sspToWorkerMap.get(ssp) == null) { - sspToWorkerMap.put(ssp, new ArrayList<AsyncTaskWorker>()); - } + sspToWorkerMap.putIfAbsent(ssp, new ArrayList<>()); sspToWorkerMap.get(ssp).add(taskWorkers.get(task.taskName())); } } @@ -202,7 +200,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { private IncomingMessageEnvelope chooseEnvelope() { IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false); if (envelope != null) { - log.trace("Choose envelope ssp {} offset {} for processing", envelope.getSystemStreamPartition(), envelope.getOffset()); + log.trace("Choose envelope ssp {} offset {} for processing", + envelope.getSystemStreamPartition(), envelope.getOffset()); containerMetrics.envelopes().inc(); } else { log.trace("No envelope is available"); @@ -310,11 +309,11 @@ public class AsyncRunLoop implements Runnable, Throttleable { * will run the task asynchronously. It runs window and commit in the provided thread pool. */ private class AsyncTaskWorker implements TaskCallbackListener { - private final TaskInstance<AsyncStreamTask> task; + private final TaskInstance task; private final TaskCallbackManager callbackManager; private volatile AsyncTaskState state; - AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) { + AsyncTaskWorker(TaskInstance task) { this.task = task; this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock); Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task); @@ -351,12 +350,14 @@ public class AsyncRunLoop implements Runnable, Throttleable { * @param task * @return a Set of SSPs such that all SSPs are not at end of stream. */ - private Set<SystemStreamPartition> getWorkingSSPSet(TaskInstance<AsyncStreamTask> task) { + private Set<SystemStreamPartition> getWorkingSSPSet(TaskInstance task) { Set<SystemStreamPartition> allPartitions = new HashSet<>(JavaConversions.setAsJavaSet(task.systemStreamPartitions())); // filter only those SSPs that are not at end of stream. - Set<SystemStreamPartition> workingSSPSet = allPartitions.stream().filter(ssp -> !consumerMultiplexer.isEndOfStream(ssp)).collect(Collectors.toSet()); + Set<SystemStreamPartition> workingSSPSet = allPartitions.stream() + .filter(ssp -> !consumerMultiplexer.isEndOfStream(ssp)) + .collect(Collectors.toSet()); return workingSSPSet; } @@ -511,15 +512,18 @@ public class AsyncRunLoop implements Runnable, Throttleable { state.doneProcess(); TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs); - log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); + log.trace("Got callback complete for task {}, ssp {}", + callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl); if (callbackToUpdate != null) { IncomingMessageEnvelope envelope = callbackToUpdate.envelope; - log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); + log.trace("Update offset for ssp {}, offset {}", + envelope.getSystemStreamPartition(), envelope.getOffset()); // update offset - task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset()); + task.offsetManager().update(task.taskName(), + envelope.getSystemStreamPartition(), envelope.getOffset()); // update coordinator coordinatorRequests.update(callbackToUpdate.coordinator); @@ -695,7 +699,8 @@ public class AsyncRunLoop implements Runnable, Throttleable { PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove(); int queueSize = pendingEnvelopeQueue.size(); taskMetrics.pendingMessages().set(queueSize); - log.trace("fetch envelope ssp {} offset {} to process.", pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset()); + log.trace("fetch envelope ssp {} offset {} to process.", + pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset()); log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize); if (pendingEnvelope.markProcessed()) { http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 13b72fa..b64e406 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -75,6 +75,9 @@ object JobConfig { val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000 val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory" + // Processor Config Constants + val PROCESSOR_ID = "processor.id" + implicit def Config2Job(config: Config) = new JobConfig(config) /** http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 7df7d88..b1ab1e0 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -22,7 +22,6 @@ package org.apache.samza.container import org.apache.samza.task.CoordinatorRequests import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition} import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.task.StreamTask import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, TimerUtils} import scala.collection.JavaConversions._ @@ -37,7 +36,7 @@ import scala.collection.JavaConversions._ * be done when. */ class RunLoop ( - val taskInstances: Map[TaskName, TaskInstance[StreamTask]], + val taskInstances: Map[TaskName, TaskInstance], val consumerMultiplexer: SystemConsumers, val metrics: SamzaContainerMetrics, val maxThrottlingDelayMs: Long, @@ -57,13 +56,16 @@ class RunLoop ( // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them. val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping - def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance[StreamTask]]] = { - // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly - def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance[StreamTask]) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap + def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = { + // We could just pass in the SystemStreamPartitionMap during construction, + // but it's safer and cleaner to derive the information directly + def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = + taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap - taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map { - case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList - } + taskInstances.values + .flatMap(getSystemStreamPartitionToTaskInstance) + .groupBy(_._1) + .map { case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index e49da57..c3308bf 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -22,9 +22,7 @@ package org.apache.samza.container import java.io.File import java.nio.file.Path import java.util -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit +import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} import java.lang.Thread.UncaughtExceptionHandler import java.net.{URL, UnknownHostException} import org.apache.samza.SamzaException @@ -32,7 +30,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config.ShellCommandConfig +import org.apache.samza.config.{Config, ShellCommandConfig} import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System @@ -69,7 +67,9 @@ import org.apache.samza.system.chooser.RoundRobinChooserFactory import org.apache.samza.task.AsyncRunLoop import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.AsyncStreamTaskAdapter +import org.apache.samza.task.AsyncStreamTaskFactory import org.apache.samza.task.StreamTask +import org.apache.samza.task.StreamTaskFactory import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.HighResolutionClock import org.apache.samza.util.ExponentialSleepStrategy @@ -112,7 +112,14 @@ object SamzaContainer extends Logging { try { jmxServer = newJmxServer() - SamzaContainer(containerModel, jobModel, jmxServer).run + val containerModel = jobModel.getContainers.get(containerId.toInt) + SamzaContainer( + containerId.toInt, + containerModel, + config, + jobModel.maxChangeLogStreamPartitions, + getLocalityManager(containerId, config), + jmxServer).run } finally { if (jmxServer != null) { jmxServer.stop @@ -120,6 +127,17 @@ object SamzaContainer extends Logging { } } + def getLocalityManager(containerId: Int, config: Config): LocalityManager = { + val containerName = getSamzaContainerName(containerId) + val registryMap = new MetricsRegistryMap(containerName) + val coordinatorSystemProducer = + new CoordinatorStreamSystemFactory() + .getCoordinatorStreamSystemProducer( + config, + new SamzaContainerMetrics(containerName, registryMap).registry) + new LocalityManager(coordinatorSystemProducer) + } + /** * Fetches config, task:SSP assignments, and task:changelog partition * assignments, and returns objects to be used for SamzaContainer's @@ -136,10 +154,20 @@ object SamzaContainer extends Logging { classOf[JobModel]) } - def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = { - val config = jobModel.getConfig - val containerId = containerModel.getContainerId - val containerName = "samza-container-%s" format containerId + def getSamzaContainerName(containerId: Int): String = { + "samza-container-%d" format containerId + } + + def apply( + containerId: Int, + containerModel: ContainerModel, + config: Config, + maxChangeLogStreamPartitions: Int, + localityManager: LocalityManager, + jmxServer: JmxServer, + customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](), + taskFactory: Object = null) = { + val containerName = getSamzaContainerName(containerId) val containerPID = Util.getContainerPID info("Setting up Samza container: %s" format containerName) @@ -207,12 +235,6 @@ object SamzaContainer extends Logging { info("Got input stream metadata: %s" format inputStreamMetadata) - val taskClassName = config - .getTaskClass - .getOrElse(throw new SamzaException("No task class defined in configuration.")) - - info("Got stream task class: %s" format taskClassName) - val consumers = inputSystems .map(systemName => { val systemFactory = systemFactories(systemName) @@ -221,7 +243,7 @@ object SamzaContainer extends Logging { (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry)) } catch { case e: Exception => - error("Failed to create a consumer for %s, so skipping." format(systemName), e) + error("Failed to create a consumer for %s, so skipping." format systemName, e) (systemName, null) } }) @@ -230,11 +252,6 @@ object SamzaContainer extends Logging { info("Got system consumers: %s" format consumers.keys) - val isAsyncTask = classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName)) - if (isAsyncTask) { - info("%s is AsyncStreamTask" format taskClassName) - } - val producers = systemFactories .map { case (systemName, systemFactory) => @@ -242,12 +259,11 @@ object SamzaContainer extends Logging { (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry)) } catch { case e: Exception => - error("Failed to create a producer for %s, so skipping." format(systemName), e) + error("Failed to create a producer for %s, so skipping." format systemName, e) (systemName, null) } } .filter(_._2 != null) - .toMap info("Got system producers: %s" format producers.keys) @@ -265,44 +281,48 @@ object SamzaContainer extends Logging { info("Got serdes: %s" format serdes.keys) /* - * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined in the config. This is useful to build both key and message serde maps. + * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined + * in the config. This is useful to build both key and message serde maps. */ val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => { systemNames .filter(systemName => getSerdeName(systemName).isDefined) .map(systemName => { val serdeName = getSerdeName(systemName).get - val serde = serdes.getOrElse(serdeName, throw new SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format serdeName)) + val serde = serdes.getOrElse(serdeName, + throw new SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format serdeName)) (systemName, serde) }).toMap } /* - * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps. + * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. + * This is useful to build both key and message serde maps. */ val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => { (serdeStreams ++ inputSystemStreamPartitions) .filter(systemStream => getSerdeName(systemStream).isDefined) .map(systemStream => { val serdeName = getSerdeName(systemStream).get - val serde = serdes.getOrElse(serdeName, throw new SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." format serdeName)) + val serde = serdes.getOrElse(serdeName, + throw new SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." format serdeName)) (systemStream, serde) }).toMap } - val systemKeySerdes = buildSystemSerdeMap((systemName: String) => config.getSystemKeySerde(systemName)) + val systemKeySerdes = buildSystemSerdeMap(systemName => config.getSystemKeySerde(systemName)) debug("Got system key serdes: %s" format systemKeySerdes) - val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => config.getSystemMsgSerde(systemName)) + val systemMessageSerdes = buildSystemSerdeMap(systemName => config.getSystemMsgSerde(systemName)) debug("Got system message serdes: %s" format systemMessageSerdes) - val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamKeySerde(systemStream)) + val systemStreamKeySerdes = buildSystemStreamSerdeMap(systemStream => config.getStreamKeySerde(systemStream)) debug("Got system stream key serdes: %s" format systemStreamKeySerdes) - val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamMsgSerde(systemStream)) + val systemStreamMessageSerdes = buildSystemStreamSerdeMap(systemStream => config.getStreamMsgSerde(systemStream)) debug("Got system stream message serdes: %s" format systemStreamMessageSerdes) @@ -349,15 +369,10 @@ object SamzaContainer extends Logging { } info("Got security manager: %s" format securityManager) - val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry) - val localityManager = new LocalityManager(coordinatorSystemProducer) - val checkpointManager = config.getCheckpointManagerFactory() match { - case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) => - Util - .getObj[CheckpointManagerFactory](checkpointFactoryClassName) - .getCheckpointManager(config, samzaContainerMetrics.registry) - case _ => null - } + val checkpointManager = config.getCheckpointManagerFactory() + .filterNot(_.isEmpty) + .map(Util.getObj[CheckpointManagerFactory](_).getCheckpointManager(config, samzaContainerMetrics.registry)) + .orNull info("Got checkpoint manager: %s" format checkpointManager) // create a map of consumers with callbacks to pass to the OffsetManager @@ -415,8 +430,26 @@ object SamzaContainer extends Logging { val singleThreadMode = config.getSingleThreadMode info("Got single thread mode: " + singleThreadMode) + val taskClassName = config.getTaskClass.orNull + info("Got task class name: %s" format taskClassName) + + if (taskClassName == null && taskFactory == null) { + throw new SamzaException("Either the task class name or the task factory instance is required.") + } + + val isAsyncTask: Boolean = + if (taskFactory != null) { + taskFactory.isInstanceOf[AsyncStreamTaskFactory] + } else { + classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName)) + } + + if (isAsyncTask) { + info("Got an AsyncStreamTask implementation.") + } + if(singleThreadMode && isAsyncTask) { - throw new SamzaException("AsyncStreamTask %s cannot run on single thread mode." format taskClassName) + throw new SamzaException("AsyncStreamTask cannot run on single thread mode.") } val threadPoolSize = config.getThreadPoolSize @@ -443,12 +476,23 @@ object SamzaContainer extends Logging { val storeWatchPaths = new util.HashSet[Path]() storeWatchPaths.add(defaultStoreBaseDir.toPath) - val taskInstances: Map[TaskName, TaskInstance[_]] = containerModel.getTasks.values.map(taskModel => { + val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { debug("Setting up task instance: %s" format taskModel) val taskName = taskModel.getTaskName - val taskObj = Class.forName(taskClassName).newInstance + val taskObj = if (taskFactory != null) { + debug("Using task factory to create task instance") + taskFactory match { + case tf: AsyncStreamTaskFactory => tf.createInstance() + case tf: StreamTaskFactory => tf.createInstance() + case _ => + throw new SamzaException("taskFactory must be an instance of StreamTaskFactory or AsyncStreamTaskFactory") + } + } else { + debug("Using task class name: %s to create instance" format taskClassName) + Class.forName(taskClassName).newInstance + } val task = if (!singleThreadMode && !isAsyncTask) // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool @@ -464,18 +508,21 @@ object SamzaContainer extends Logging { .map { case (storeName, changeLogSystemStream) => val systemConsumer = systemFactories - .getOrElse(changeLogSystemStream.getSystem, throw new SamzaException("Changelog system %s for store %s does not exist in the config." format (changeLogSystemStream, storeName))) + .getOrElse(changeLogSystemStream.getSystem, + throw new SamzaException("Changelog system %s for store %s does not " + + "exist in the config." format (changeLogSystemStream, storeName))) .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry) samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName) (storeName, systemConsumer) - }.toMap + } info("Got store consumers: %s" format storeConsumers) var loggedStorageBaseDir: File = null if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { val jobNameAndId = Util.getJobNameAndId(config) - loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) + loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) } else { warn("No override was provided for logged store base directory. This disables local state re-use on " + "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " + @@ -496,11 +543,13 @@ object SamzaContainer extends Logging { null } val keySerde = config.getStorageKeySerde(storeName) match { - case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde)) + case Some(keySerde) => serdes.getOrElse(keySerde, + throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde)) case _ => null } val msgSerde = config.getStorageMsgSerde(storeName) match { - case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde)) + case Some(msgSerde) => serdes.getOrElse(msgSerde, + throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde)) case _ => null } val storeBaseDir = if(changeLogSystemStreamPartition != null) { @@ -528,7 +577,7 @@ object SamzaContainer extends Logging { taskStores = taskStores, storeConsumers = storeConsumers, changeLogSystemStreams = changeLogSystemStreams, - jobModel.maxChangeLogStreamPartitions, + maxChangeLogStreamPartitions, streamMetadataCache = streamMetadataCache, storeBaseDir = defaultStoreBaseDir, loggedStoreBaseDir = loggedStorageBaseDir, @@ -541,7 +590,7 @@ object SamzaContainer extends Logging { info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName) - def createTaskInstance[T] (task: T ): TaskInstance[T] = new TaskInstance[T]( + def createTaskInstance(task: Any): TaskInstance = new TaskInstance( task = task, taskName = taskName, config = config, @@ -632,7 +681,7 @@ object SamzaContainer extends Logging { class SamzaContainer( containerContext: SamzaContainerContext, - taskInstances: Map[TaskName, TaskInstance[_]], + taskInstances: Map[TaskName, TaskInstance], runLoop: Runnable, consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers, @@ -648,6 +697,17 @@ class SamzaContainer( taskThreadPool: ExecutorService = null) extends Runnable with Logging { val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L) + private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1) + + def awaitStart(timeoutMs: Long): Boolean = { + try { + runLoopStartLatch.await(timeoutMs, TimeUnit.MILLISECONDS) + } catch { + case ie: InterruptedException => + error("Interrupted while waiting for runloop to start!", ie) + throw ie + } + } def run { try { @@ -664,8 +724,9 @@ class SamzaContainer( startConsumers startSecurityManger - info("Entering run loop.") addShutdownHook + runLoopStartLatch.countDown() + info("Entering run loop.") runLoop.run } catch { case e: Throwable => @@ -689,6 +750,13 @@ class SamzaContainer( } } + def shutdown() = { + runLoop match { + case runLoop: RunLoop => runLoop.shutdown + case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown() + } + } + def startDiskSpaceMonitor: Unit = { if (diskSpaceMonitor != null) { info("Starting disk space monitor") @@ -746,9 +814,11 @@ class SamzaContainer( localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName, jmxUrl, jmxTunnelingUrl) } catch { case uhe: UnknownHostException => - warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage)) //No-op + warn("Received UnknownHostException when persisting locality info for container %d: " + + "%s" format (containerContext.id, uhe.getMessage)) //No-op case unknownException: Throwable => - warn("Received an exception when persisting locality info for container %d: %s" format (containerContext.id, unknownException.getMessage)) + warn("Received an exception when persisting locality info for container %d: " + + "%s" format (containerContext.id, unknownException.getMessage)) } } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a3a5ac7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 26a8f5f..e07fcf4 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -43,8 +43,8 @@ import org.apache.samza.util.Logging import scala.collection.JavaConversions._ -class TaskInstance[T]( - task: T, +class TaskInstance( + task: Any, val taskName: TaskName, config: Config, val metrics: TaskInstanceMetrics, @@ -84,7 +84,8 @@ class TaskInstance[T]( // store the (ssp -> if this ssp is catched up) mapping. "catched up" // means the same ssp in other taskInstances have the same offset as // the one here. - var ssp2catchedupMapping: scala.collection.mutable.Map[SystemStreamPartition, Boolean] = scala.collection.mutable.Map[SystemStreamPartition, Boolean]() + var ssp2catchedupMapping: scala.collection.mutable.Map[SystemStreamPartition, Boolean] = + scala.collection.mutable.Map[SystemStreamPartition, Boolean]() systemStreamPartitions.foreach(ssp2catchedupMapping += _ -> false) def registerMetrics { @@ -140,7 +141,8 @@ class TaskInstance[T]( }) } - def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator, callbackFactory: TaskCallbackFactory = null) { + def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator, + callbackFactory: TaskCallbackFactory = null) { metrics.processes.inc if (!ssp2catchedupMapping.getOrElse(envelope.getSystemStreamPartition, @@ -151,7 +153,8 @@ class TaskInstance[T]( if (ssp2catchedupMapping(envelope.getSystemStreamPartition)) { metrics.messagesActuallyProcessed.inc - trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) + trace("Processing incoming message envelope for taskName and SSP: %s, %s" + format (taskName, envelope.getSystemStreamPartition)) if (isAsyncTask) { exceptionHandler.maybeHandle { @@ -163,7 +166,8 @@ class TaskInstance[T]( task.asInstanceOf[StreamTask].process(envelope, collector, coordinator) } - trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) + trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" + format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset) } @@ -173,7 +177,7 @@ class TaskInstance[T]( def endOfStream(coordinator: ReadableCoordinator): Unit = { if (isEndOfStreamListenerTask) { exceptionHandler.maybeHandle { - task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, coordinator); + task.asInstanceOf[EndOfStreamListenerTask].onEndOfStream(collector, coordinator) } } } @@ -230,7 +234,8 @@ class TaskInstance[T]( override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) - def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s endofstreamlistener=%s]" format (taskName, isWindowableTask, isClosableTask, isEndOfStreamListenerTask) + def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s endofstreamlistener=%s]" format + (taskName, isWindowableTask, isClosableTask, isEndOfStreamListenerTask) /** * From the envelope, check if this SSP has catched up with the starting offset of the SSP