Repository: samza Updated Branches: refs/heads/master a4174309a -> ad8ba96e7
SAMZA-1271; Guarantee predictable, deterministic order for operator initialization and finalization Currently, the order of initialization of operators in the Samza high level API is not deterministic. The non-determinism arises from two primary causes: - No fixed order of iteration for all subscribed `OperatorSpec`s for a given `MessageStream` - No fixed order of iteration for all the `OperatorImpl`s in the `OperatorImplGraph` We aim to provide the following 2 guarantees in this patch: For any 2 operators A, B in the graph, if B consumes the output of A: - A is initialized before B is initialized - A is finalized only after B is finalized Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: Prateek Maheshwari<pmahe...@linkedin.com> Closes #211 from vjagadish1989/deterministic_order Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad8ba96e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad8ba96e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad8ba96e Branch: refs/heads/master Commit: ad8ba96e7ee2e98fe62fe0af07cb93e6153c5134 Parents: a417430 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Mon Jun 5 10:27:23 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Jun 5 10:27:23 2017 -0700 ---------------------------------------------------------------------- .../operators/functions/ClosableFunction.java | 4 + .../operators/functions/InitableFunction.java | 5 + .../samza/operators/MessageStreamImpl.java | 6 +- .../apache/samza/operators/StreamGraphImpl.java | 7 +- .../samza/operators/impl/OperatorImplGraph.java | 19 ++-- .../samza/operators/spec/OperatorSpecs.java | 11 ++ .../apache/samza/task/StreamOperatorTask.java | 6 +- .../samza/operators/TestStreamGraphImpl.java | 29 +++++ .../operators/impl/TestOperatorImplGraph.java | 111 +++++++++++++++++++ 9 files changed, 180 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java index 2e73652..fe7137f 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java @@ -26,6 +26,10 @@ import org.apache.samza.annotation.InterfaceStability; * * <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc. * + * <p> Order of finalization: {@link ClosableFunction}s are invoked in the reverse topological order of operators in the + * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results + * from operator A, then operator B is guaranteed to be closed before operator A. + * */ @InterfaceStability.Unstable public interface ClosableFunction { http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java index 4f9fad7..b08c6cd 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java @@ -25,6 +25,11 @@ import org.apache.samza.task.TaskContext; /** * A function that can be initialized before execution. + * + * <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the + * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results + * from operator A, then operator A is guaranteed to be initialized before operator B. + * */ @InterfaceStability.Unstable public interface InitableFunction { http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 0c84e90..9912f95 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -41,7 +41,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.function.Function; @@ -61,8 +61,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> { /** * The set of operators that consume the messages in this {@link MessageStream} + * + * Use a LinkedHashSet since we need deterministic ordering in initializing/closing operators. */ - private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>(); + private final Set<OperatorSpec> registeredOperatorSpecs = new LinkedHashSet<>(); /** * Default constructor http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index 1f1d282..fcce5eb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -31,8 +31,8 @@ import org.apache.samza.system.StreamSpec; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; @@ -51,8 +51,9 @@ public class StreamGraphImpl implements StreamGraph { */ private int opId = 0; - private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>(); - private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>(); + // Using LHM for deterministic order in initializing and closing operators. + private final Map<StreamSpec, InputStreamInternal> inStreams = new LinkedHashMap<>(); + private final Map<StreamSpec, OutputStreamInternal> outStreams = new LinkedHashMap<>(); private final ApplicationRunner runner; private final Config config; http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 78a6d1e..e99b3ee 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.impl; +import com.google.common.collect.Lists; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraphImpl; @@ -31,9 +32,12 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; @@ -47,8 +51,10 @@ public class OperatorImplGraph { * A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating * multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different * input {@link MessageStreamImpl}s. + * + * Using LHM for deterministic ordering in initializing and closing operators. */ - private final Map<OperatorSpec, OperatorImpl> operatorImpls = new HashMap<>(); + private final Map<OperatorSpec, OperatorImpl> operatorImpls = new LinkedHashMap<>(); /** * A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph. @@ -99,13 +105,10 @@ public class OperatorImplGraph { return Collections.unmodifiableCollection(this.rootOperators.values()); } - /** - * Get all {@link OperatorImpl}s for the graph. - * - * @return an unmodifiable view of all {@link OperatorImpl}s for the graph - */ - public Collection<OperatorImpl> getAllOperators() { - return Collections.unmodifiableCollection(this.operatorImpls.values()); + public void close() { + List<OperatorImpl> initializationOrder = new ArrayList<>(operatorImpls.values()); + List<OperatorImpl> finalizationOrder = Lists.reverse(initializationOrder); + finalizationOrder.forEach(OperatorImpl::close); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index 0b93bbe..66e2c58 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -71,6 +71,11 @@ public class OperatorSpecs { public void init(Config config, TaskContext context) { mapFn.init(config, context); } + + @Override + public void close() { + mapFn.close(); + } }, nextStream, OperatorSpec.OpCode.MAP, opId); } @@ -101,6 +106,12 @@ public class OperatorSpecs { public void init(Config config, TaskContext context) { filterFn.init(config, context); } + + @Override + public void close() { + filterFn.close(); + } + }, nextStream, OperatorSpec.OpCode.FILTER, opId); } http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index a5f3f85..50ae775 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -22,7 +22,6 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.impl.OperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; import org.apache.samza.operators.impl.RootOperatorImpl; import org.apache.samza.operators.stream.InputStreamInternal; @@ -32,7 +31,6 @@ import org.apache.samza.system.SystemStream; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; -import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -142,8 +140,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo if (this.contextManager != null) { this.contextManager.close(); } - - Collection<OperatorImpl> allOperators = operatorImplGraph.getAllOperators(); - allOperators.forEach(OperatorImpl::close); + operatorImplGraph.close(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java index 666bbb8..9d95217 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java @@ -18,11 +18,13 @@ */ package org.apache.samza.operators; +import junit.framework.Assert; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.data.MessageType; import org.apache.samza.operators.data.TestInputMessageEnvelope; import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.stream.InputStreamInternal; import org.apache.samza.operators.stream.InputStreamInternalImpl; import org.apache.samza.operators.stream.IntermediateStreamInternalImpl; import org.apache.samza.operators.stream.OutputStreamInternalImpl; @@ -30,6 +32,7 @@ import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.StreamSpec; import org.junit.Test; +import java.util.ArrayList; import java.util.function.BiFunction; import java.util.function.Function; @@ -193,4 +196,30 @@ public class TestStreamGraphImpl { assertEquals(graph.getNextOpId(), 1); } + @Test + public void testGetInputStreamPreservesInsertionOrder() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system"); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1); + + StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system"); + when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2); + + StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system"); + when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3); + + graph.getInputStream("test-stream-1", (k, v) -> v); + graph.getInputStream("test-stream-2", (k, v) -> v); + graph.getInputStream("test-stream-3", (k, v) -> v); + + ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values()); + Assert.assertEquals(inputMessageStreams.size(), 3); + Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1); + Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2); + Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java new file mode 100644 index 0000000..67e5b46 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.task.TaskContext; +import org.apache.samza.util.SystemClock; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestOperatorImplGraph { + + @Test + public void testOperatorGraphInitAndClose() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system"); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1); + StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system"); + when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2); + + Config mockConfig = mock(Config.class); + TaskContext mockContext = createMockContext(); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + List<String> initializationOrder = new ArrayList<>(); + List<String> finalizationOrder = new ArrayList<>(); + + MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v); + MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v); + + inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder)) + .map(createMapFunction("2", initializationOrder, finalizationOrder)); + + inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder)) + .map(createMapFunction("4", initializationOrder, finalizationOrder)); + + OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance()); + + // Assert that initialization occurs in topological order. + implGraph.init(graph, mockConfig, mockContext); + assertEquals(initializationOrder.get(0), "1"); + assertEquals(initializationOrder.get(1), "2"); + assertEquals(initializationOrder.get(2), "3"); + assertEquals(initializationOrder.get(3), "4"); + + // Assert that finalization occurs in reverse topological order. + implGraph.close(); + assertEquals(finalizationOrder.get(0), "4"); + assertEquals(finalizationOrder.get(1), "3"); + assertEquals(finalizationOrder.get(2), "2"); + assertEquals(finalizationOrder.get(3), "1"); + } + + private TaskContext createMockContext() { + TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + return mockContext; + } + + /** + * Creates an identity map function that appends to the provided lists when init/close is invoked. + */ + private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) { + return new MapFunction<Object, Object>() { + @Override + public void init(Config config, TaskContext context) { + initializationOrder.add(id); + } + + @Override + public void close() { + finalizationOrder.add(id); + } + + @Override + public Object apply(Object message) { + return message; + } + }; + } +} +