This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new aeeaf0b97 SAMZA-2790: Cleanup RunLoop constructor explosion (#1680)
aeeaf0b97 is described below
commit aeeaf0b9782d03932da4ee3da9e15ef9e28ed313
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Tue Aug 29 08:49:32 2023 -0700
SAMZA-2790: Cleanup RunLoop constructor explosion (#1680)
Description:
Runloop currently takes in lot of parameters and the constructor has grown
to the point where it is unmanageable with multiple overloads. Introducing new
configuration requires lot of updates to existing tests and components even if
the parameters have no effect on all of the usages.
With this PR, we should be able to decouple different users of RunLoop and
enable these components to have their own scoped config. e.g., SideInputManager
can now have its own set of runloop parameters without having to tie itself
with TaskConfig.
Changes:
Introduce RunLoopConfig, a container object to hold all required parameters
for runloop from Config.
Remove existing overloads of constructor
Simplify the constructor to take RunLoopConfig and initialize the necessary
components and fields
Introduce SideInputManagerRunLoopConfig, an overload of RunLoopConfig to be
used within SideInputManager
Modify RunLoopFactory create method signature
Clean up ApplicationUtil and moved the method to ApplicationConfig and
added unit tests
API Changes:
No external API change
---
.../apache/samza/application/ApplicationUtil.java | 9 --
.../org/apache/samza/config/ApplicationConfig.java | 3 +
.../org/apache/samza/config/RunLoopConfig.java | 84 ++++++++++++++++
.../java/org/apache/samza/container/RunLoop.java | 92 +++++++++--------
.../org/apache/samza/container/RunLoopFactory.java | 54 +---------
.../apache/samza/container/SamzaContainer.scala | 11 +--
.../apache/samza/storage/SideInputsManager.java | 53 +++++++---
.../samza/application/TestApplicationUtil.java | 13 ---
.../apache/samza/config/TestApplicationConfig.java | 56 +++++++++++
.../org/apache/samza/container/TestRunLoop.java | 109 +++++++++++----------
10 files changed, 293 insertions(+), 191 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
index 07badc782..85762056f 100644
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -64,13 +64,4 @@ public class ApplicationUtil {
}
return new LegacyTaskApplication(taskClassOption.get());
}
-
- /**
- * Determines if the job is a Samza high-level job.
- * @param config config
- * */
- public static boolean isHighLevelApiJob(Config config) {
- final ApplicationConfig applicationConfig = new ApplicationConfig(config);
- return applicationConfig.getAppApiType() == ApplicationApiType.HIGH_LEVEL;
- }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 96781ad0c..2363bc753 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -114,4 +114,7 @@ public class ApplicationConfig extends MapConfig {
public ApplicationApiType getAppApiType() {
return ApplicationApiType.valueOf(get(APP_API_TYPE,
ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
}
+ public boolean isHighLevelApiJob() {
+ return getAppApiType() == ApplicationApiType.HIGH_LEVEL;
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
new file mode 100644
index 000000000..ee1d7f71f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A container class to hold run loop related configurations to prevent
constructor explosion
+ * in {@link org.apache.samza.container.RunLoop}
+ */
+public class RunLoopConfig extends MapConfig {
+ private static final String CONTAINER_DISK_QUOTA_DELAY_MAX_MS =
"container.disk.quota.delay.max.ms";
+ private ApplicationConfig appConfig;
+ private JobConfig jobConfig;
+ private TaskConfig taskConfig;
+
+ public RunLoopConfig(Config config) {
+ super(config);
+ this.appConfig = new ApplicationConfig(config);
+ this.jobConfig = new JobConfig(config);
+ this.taskConfig = new TaskConfig(config);
+ }
+
+ public int getMaxConcurrency() {
+ return taskConfig.getMaxConcurrency();
+ }
+
+ public long getTaskCallbackTimeoutMs() {
+ return taskConfig.getCallbackTimeoutMs();
+ }
+
+ public long getDrainCallbackTimeoutMs() {
+ return taskConfig.getDrainCallbackTimeoutMs();
+ }
+
+ public boolean asyncCommitEnabled() {
+ return taskConfig.getAsyncCommit();
+ }
+
+ public long getWindowMs() {
+ return taskConfig.getWindowMs();
+ }
+
+ public long getCommitMs() {
+ return taskConfig.getCommitMs();
+ }
+
+ public long getMaxIdleMs() {
+ return taskConfig.getMaxIdleMs();
+ }
+
+ public long getMaxThrottlingDelayMs() {
+ return getLong(CONTAINER_DISK_QUOTA_DELAY_MAX_MS,
TimeUnit.SECONDS.toMillis(1));
+ }
+
+ public String getRunId() {
+ return appConfig.getRunId();
+ }
+
+ public int getElasticityFactor() {
+ return jobConfig.getElasticityFactor();
+ }
+
+ public boolean isHighLevelApiJob() {
+ return appConfig.isHighLevelApiJob();
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 65acc9506..25c924f00 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.RunLoopConfig;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.MessageType;
@@ -94,61 +95,66 @@ public class RunLoop implements Runnable, Throttleable {
private final boolean isHighLevelApiJob;
private boolean isDraining = false;
+ /*
+ * Order of initialization
+ * 1. Initialize fields with arguments passed to the constructor
+ * 2. Initialize fields that are constructed within the constructor
+ * 3. Initialize fields that are derived and constructed using the
arguments passed to the constructor
+ */
public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
ExecutorService threadPool,
SystemConsumers consumerMultiplexer,
- int maxConcurrency,
- long windowMs,
- long commitMs,
- long callbackTimeoutMs,
- long drainCallbackTimeoutMs,
- long maxThrottlingDelayMs,
- long maxIdleMs,
- SamzaContainerMetrics containerMetrics,
- HighResolutionClock clock,
- boolean isAsyncCommitEnabled) {
- this(runLoopTasks, threadPool, consumerMultiplexer, maxConcurrency,
windowMs, commitMs, callbackTimeoutMs,
- drainCallbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs,
containerMetrics, clock, isAsyncCommitEnabled, 1, null, false);
- }
-
- public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
- ExecutorService threadPool,
- SystemConsumers consumerMultiplexer,
- int maxConcurrency,
- long windowMs,
- long commitMs,
- long callbackTimeoutMs,
- long drainCallbackTimeoutMs,
- long maxThrottlingDelayMs,
- long maxIdleMs,
SamzaContainerMetrics containerMetrics,
HighResolutionClock clock,
- boolean isAsyncCommitEnabled,
- int elasticityFactor,
- String runId,
- boolean isHighLevelApiJob) {
+ RunLoopConfig config) {
this.threadPool = threadPool;
this.consumerMultiplexer = consumerMultiplexer;
this.containerMetrics = containerMetrics;
- this.windowMs = windowMs;
- this.commitMs = commitMs;
- this.maxConcurrency = maxConcurrency;
- this.callbackTimeoutMs = callbackTimeoutMs;
- this.drainCallbackTimeoutMs = drainCallbackTimeoutMs;
- this.maxIdleMs = maxIdleMs;
- this.callbackTimer = (callbackTimeoutMs > 0) ?
Executors.newSingleThreadScheduledExecutor() : null;
- this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs);
- this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());
- this.latch = new Object();
- this.workerTimer = Executors.newSingleThreadScheduledExecutor();
+
+ this.windowMs = config.getWindowMs();
+ log.info("Got window milliseconds: {}.", windowMs);
+
+ this.commitMs = config.getCommitMs();
+ log.info("Got commit milliseconds: {}.", commitMs);
+
+ this.maxConcurrency = config.getMaxConcurrency();
+ log.info("Got task concurrency: {}.", maxConcurrency);
+
+ this.callbackTimeoutMs = config.getTaskCallbackTimeoutMs();
+ log.info("Got callback timeout for task in milliseconds: {}.",
callbackTimeoutMs);
+
+ this.drainCallbackTimeoutMs = config.getDrainCallbackTimeoutMs();
+ log.info("Got callback timeout for drain in milliseconds: {}.",
drainCallbackTimeoutMs);
+
+ this.maxIdleMs = config.getMaxIdleMs();
+ log.info("Got max idle in milliseconds: {}.", maxIdleMs);
+
this.clock = clock;
// assign runId before creating workers. As the inner AsyncTaskWorker
class is not static, it relies on
// the outer class fields to be init first
- this.runId = runId;
- this.isHighLevelApiJob = isHighLevelApiJob;
- this.isAsyncCommitEnabled = isAsyncCommitEnabled;
- this.elasticityFactor = elasticityFactor;
+ this.runId = config.getRunId();
+ log.info("Got current run Id: {}.", runId);
+
+ this.isHighLevelApiJob = config.isHighLevelApiJob();
+ if (isHighLevelApiJob) {
+ log.info("The application uses high-level API.");
+ } else {
+ log.info("The application doesn't use high-level API.");
+ }
+
+ this.isAsyncCommitEnabled = config.asyncCommitEnabled();
+ log.info("Got async commit enabled={}.", isAsyncCommitEnabled);
+
+ this.elasticityFactor = config.getElasticityFactor();
+ log.info("Got elasticity factor: {}.", elasticityFactor);
+
+ this.latch = new Object();
+ this.workerTimer = Executors.newSingleThreadScheduledExecutor();
+
+ this.callbackTimer = (callbackTimeoutMs > 0) ?
Executors.newSingleThreadScheduledExecutor() : null;
+ this.callbackExecutor = new
ThrottlingScheduler(config.getMaxThrottlingDelayMs());
+ this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());
Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
for (RunLoopTask task : runLoopTasks.values()) {
diff --git
a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 8dac7a1ab..1de654d5d 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -19,7 +19,8 @@
package org.apache.samza.container;
-import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.RunLoopConfig;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
@@ -37,46 +38,9 @@ public class RunLoopFactory {
public static Runnable
createRunLoop(scala.collection.immutable.Map<TaskName, RunLoopTask>
taskInstances,
SystemConsumers consumerMultiplexer,
ExecutorService threadPool,
- long maxThrottlingDelayMs,
SamzaContainerMetrics containerMetrics,
- TaskConfig taskConfig,
HighResolutionClock clock,
- int elasticityFactor,
- String runId,
- boolean isHighLevelApiJob) {
-
- long taskWindowMs = taskConfig.getWindowMs();
-
- log.info("Got window milliseconds: {}.", taskWindowMs);
-
- long taskCommitMs = taskConfig.getCommitMs();
-
- log.info("Got commit milliseconds: {}.", taskCommitMs);
-
- int taskMaxConcurrency = taskConfig.getMaxConcurrency();
- log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency);
-
- boolean isAsyncCommitEnabled = taskConfig.getAsyncCommit();
- log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled);
-
- long callbackTimeout = taskConfig.getCallbackTimeoutMs();
- log.info("Got callbackTimeout: {}.", callbackTimeout);
-
- long drainCallbackTimeout = taskConfig.getDrainCallbackTimeoutMs();
- log.info("Got callback timeout for drain: {}.", callbackTimeout);
-
- long maxIdleMs = taskConfig.getMaxIdleMs();
- log.info("Got maxIdleMs: {}.", maxIdleMs);
-
- log.info("Got elasticity factor: {}.", elasticityFactor);
-
- log.info("Got current run Id: {}.", runId);
-
- if (isHighLevelApiJob) {
- log.info("The application uses high-level API.");
- } else {
- log.info("The application doesn't use high-level API.");
- }
+ Config config) {
log.info("Run loop in asynchronous mode.");
@@ -84,18 +48,8 @@ public class RunLoopFactory {
JavaConverters.mapAsJavaMapConverter(taskInstances).asJava(),
threadPool,
consumerMultiplexer,
- taskMaxConcurrency,
- taskWindowMs,
- taskCommitMs,
- callbackTimeout,
- drainCallbackTimeout,
- maxThrottlingDelayMs,
- maxIdleMs,
containerMetrics,
clock,
- isAsyncCommitEnabled,
- elasticityFactor,
- runId,
- isHighLevelApiJob);
+ new RunLoopConfig(config));
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 999bdb7dd..689b5aa28 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -29,7 +29,6 @@ import java.util.function.Consumer
import java.util.{Base64, Optional}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.SamzaException
-import org.apache.samza.application.ApplicationUtil
import org.apache.samza.checkpoint.{Checkpoint, CheckpointListener,
OffsetManager, OffsetManagerMetrics}
import org.apache.samza.clustermanager.StandbyTaskUtil
import org.apache.samza.config.{StreamConfig, _}
@@ -624,21 +623,13 @@ object SamzaContainer extends Logging {
(taskName, taskInstance)
}).toMap
- val maxThrottlingDelayMs =
config.getLong("container.disk.quota.delay.max.ms",
TimeUnit.SECONDS.toMillis(1))
-
- val isHighLevelApiJob = ApplicationUtil.isHighLevelApiJob(config)
-
val runLoop: Runnable = RunLoopFactory.createRunLoop(
taskInstances,
consumerMultiplexer,
taskThreadPool,
- maxThrottlingDelayMs,
samzaContainerMetrics,
- taskConfig,
clock,
- jobConfig.getElasticityFactor,
- appConfig.getRunId,
- isHighLevelApiJob)
+ config)
val systemStatisticsMonitor : SystemStatisticsMonitor = new
StatisticsMonitorImpl()
systemStatisticsMonitor.registerListener(
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 3ccaf1a28..128d7db46 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.config.RunLoopConfig;
import scala.collection.JavaConversions;
import java.io.File;
@@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -255,24 +255,13 @@ public class SideInputsManager {
final ApplicationConfig applicationConfig = new
ApplicationConfig(config);
+ SideInputRunLoopConfig runLoopConfig = new
SideInputRunLoopConfig(config);
this.sideInputRunLoop = new RunLoop(sideInputTasks,
null, // all operations are executed in the main runloop thread
this.sideInputSystemConsumers,
- 1, // single message in flight per task
- -1, // no windowing
- taskConfig.getCommitMs(),
- taskConfig.getCallbackTimeoutMs(),
- taskConfig.getDrainCallbackTimeoutMs(),
- // TODO consolidate these container configs SAMZA-2275
- this.config.getLong("container.disk.quota.delay.max.ms",
TimeUnit.SECONDS.toMillis(1)),
- taskConfig.getMaxIdleMs(),
sideInputContainerMetrics,
System::nanoTime,
- false,
- DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR,
- applicationConfig.getRunId(),
- ApplicationUtil.isHighLevelApiJob(config)
- ); // commit must be synchronous to ensure integrity of state flush
+ runLoopConfig);
try {
sideInputsExecutor.submit(() -> {
@@ -507,4 +496,40 @@ public class SideInputsManager {
}
return true;
}
+
+ /**
+ * Decorated {@link RunLoopConfig} used for side inputs flow in samza. The
properties of {@link RunLoop} for side
+ * input use case is as follows
+ * 1. Max concurrency within a side input task is always <i>1</i>. This is
critical as ordering of OPs (CRUD) for
+ * side input stores needs to be followed to recreate the correct
snapshot of the external data
+ * 2. Side input tasks don't have any windows. We only allow users to
plugin process functions
+ * 3. Commits are synchronous as we need to ensure data integrity upon
state flushes
+ */
+ private static class SideInputRunLoopConfig extends RunLoopConfig {
+
+ public SideInputRunLoopConfig(Config config) {
+ super(config);
+ }
+
+ @Override
+ public int getMaxConcurrency() {
+ return 1;
+ }
+
+ @Override
+ public long getWindowMs() {
+ return -1;
+ }
+
+ @Override
+ public int getElasticityFactor() {
+ return DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR;
+ }
+
+ // commit must be synchronous to ensure integrity of state flush
+ @Override
+ public boolean asyncCommitEnabled() {
+ return false;
+ }
+ }
}
diff --git
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
index abcd81239..ba25fa072 100644
---
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
+++
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -86,19 +86,6 @@ public class TestApplicationUtil {
ApplicationUtil.fromConfig(new MapConfig(configMap));
}
- @Test
- public void testIsHighLevelJob() {
- final Map<String, String> configMap = new HashMap<>();
- configMap.put(ApplicationConfig.APP_API_TYPE,
ApplicationApiType.HIGH_LEVEL.name());
- assertTrue(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
-
- configMap.put(ApplicationConfig.APP_API_TYPE,
ApplicationApiType.LOW_LEVEL.name());
- assertFalse(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
-
- configMap.put(ApplicationConfig.APP_API_TYPE,
ApplicationApiType.LEGACY.name());
- assertFalse(ApplicationUtil.isHighLevelApiJob(new MapConfig(configMap)));
- }
-
/**
* Test class of {@link TaskApplication} for unit tests
*/
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestApplicationConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestApplicationConfig.java
new file mode 100644
index 000000000..bd6496cb3
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/config/TestApplicationConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.application.ApplicationApiType;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestApplicationConfig {
+ @Test
+ public void isHighLevelJob() {
+ final Map<String, String> configMap = new HashMap<>();
+ configMap.put(ApplicationConfig.APP_API_TYPE,
ApplicationApiType.HIGH_LEVEL.name());
+ ApplicationConfig applicationConfig = new ApplicationConfig(new
MapConfig(configMap));
+
+ assertTrue(applicationConfig.isHighLevelApiJob());
+ }
+
+ @Test
+ public void isHighLevelJobWithLowLevelJob() {
+ final Map<String, String> configMap = new HashMap<>();
+ configMap.put(ApplicationConfig.APP_API_TYPE,
ApplicationApiType.LOW_LEVEL.name());
+ ApplicationConfig applicationConfig = new ApplicationConfig(new
MapConfig(configMap));
+
+ assertFalse(applicationConfig.isHighLevelApiJob());
+ }
+
+ @Test
+ public void isHighLevelJobWithLegacyJob() {
+ final Map<String, String> configMap = new HashMap<>();
+ configMap.put(ApplicationConfig.APP_API_TYPE,
ApplicationApiType.LEGACY.name());
+ ApplicationConfig applicationConfig = new ApplicationConfig(new
MapConfig(configMap));
+
+ assertFalse(applicationConfig.isHighLevelApiJob());
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 2d94d87b1..c3a030a9d 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.config.RunLoopConfig;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
@@ -39,10 +40,13 @@ import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
@@ -60,6 +64,10 @@ public class TestRunLoop {
private final long drainCallbackTimeoutMs = 0;
private final long maxThrottlingDelayMs = 0;
private final long maxIdleMs = 10;
+
+ private final int elasticityFactor = 1;
+
+ private final boolean isHighLevelApiJob = false;
private final Partition p0 = new Partition(0);
private final Partition p1 = new Partition(1);
private final TaskName taskName0 = new TaskName(p0.toString());
@@ -85,6 +93,25 @@ public class TestRunLoop {
@Rule
public Timeout maxTestDurationInSeconds = Timeout.seconds(120);
+ @Mock
+ private RunLoopConfig mockRunLoopConfig;
+
+ @Before
+ public void init() {
+ MockitoAnnotations.initMocks(this);
+ when(mockRunLoopConfig.getMaxConcurrency()).thenReturn(1);
+ when(mockRunLoopConfig.getWindowMs()).thenReturn(windowMs);
+ when(mockRunLoopConfig.getCommitMs()).thenReturn(commitMs);
+
when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(callbackTimeoutMs);
+
when(mockRunLoopConfig.getDrainCallbackTimeoutMs()).thenReturn(drainCallbackTimeoutMs);
+ when(mockRunLoopConfig.getMaxIdleMs()).thenReturn(maxIdleMs);
+
when(mockRunLoopConfig.getMaxThrottlingDelayMs()).thenReturn(maxThrottlingDelayMs);
+ when(mockRunLoopConfig.asyncCommitEnabled()).thenReturn(false);
+ when(mockRunLoopConfig.getElasticityFactor()).thenReturn(elasticityFactor);
+ when(mockRunLoopConfig.getRunId()).thenReturn(runId);
+ when(mockRunLoopConfig.isHighLevelApiJob()).thenReturn(false);
+ }
+
@Test
public void testProcessMultipleTasks() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
@@ -96,9 +123,7 @@ public class TestRunLoop {
tasks.put(taskName0, task0);
tasks.put(taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false, 1, "foo", false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA11).thenReturn(sspA0EndOfStream).thenReturn(
sspA1EndOfStream).thenReturn(null);
runLoop.run();
@@ -117,9 +142,7 @@ public class TestRunLoop {
RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
runLoop.run();
InOrder inOrder = inOrder(task0);
@@ -162,11 +185,12 @@ public class TestRunLoop {
return null;
}).when(task0).process(eq(envelopeA01), any(), any());
+
when(mockRunLoopConfig.getMaxConcurrency()).thenReturn(maxMessagesInFlight);
+
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA01).thenReturn(null);
runLoop.run();
@@ -215,11 +239,10 @@ public class TestRunLoop {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(sspA0EndOfStream).thenReturn(null);
+ when(mockRunLoopConfig.getElasticityFactor()).thenReturn(2);
+
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs, 0,
containerMetrics, () -> 0L,
- false, 2, null, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
runLoop.run();
verify(task0).process(eq(envelope00), any(), any());
@@ -243,10 +266,7 @@ public class TestRunLoop {
.thenReturn(sspA0Drain).thenReturn(sspA1Drain);
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0,
taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs, 0,
containerMetrics, () -> 0L,
- false, 1, runId, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
runLoop.run();
// check if process was called once for each task
@@ -274,10 +294,7 @@ public class TestRunLoop {
.thenReturn(sspA0Drain).thenReturn(sspA1Drain).thenReturn(sspB0Drain).thenReturn(sspB1Drain);
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0,
taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs, 0,
containerMetrics, () -> 0L,
- false, 1, runId, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
runLoop.run();
// check if process was called twice for each task
@@ -294,7 +311,6 @@ public class TestRunLoop {
public void testWindow() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
- int maxMessagesInFlight = 1;
long windowMs = 1;
RunLoopTask task = getMockRunLoopTask(taskName0, sspA0);
when(task.isWindowableTask()).thenReturn(true);
@@ -308,11 +324,12 @@ public class TestRunLoop {
return null;
}).when(task).window(any());
+ when(mockRunLoopConfig.getWindowMs()).thenReturn(windowMs);
+
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task);
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(null);
runLoop.run();
@@ -342,9 +359,7 @@ public class TestRunLoop {
tasks.put(this.taskName0, task0);
tasks.put(taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
//have a null message in between to make sure task0 finishes processing
and invoke the commit
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA11).thenReturn(null);
@@ -380,9 +395,7 @@ public class TestRunLoop {
tasks.put(this.taskName0, task0);
tasks.put(taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
//have a null message in between to make sure task0 finishes processing
and invoke the commit
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA11).thenReturn(null);
@@ -399,7 +412,6 @@ public class TestRunLoop {
public void testShutdownOnConsensus() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
- int maxMessagesInFlight = 1;
RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1,
ReadableCoordinator.class);
@@ -426,8 +438,7 @@ public class TestRunLoop {
tasks.put(taskName0, task0);
tasks.put(taskName1, task1);
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
// consensus is reached after envelope1 is processed.
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA11).thenReturn(null);
runLoop.run();
@@ -451,9 +462,7 @@ public class TestRunLoop {
tasks.put(taskName0, task0);
tasks.put(taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false))
.thenReturn(envelopeA00)
.thenReturn(envelopeA11)
@@ -516,11 +525,12 @@ public class TestRunLoop {
return null;
}).when(task0).endOfStream(any());
+
when(mockRunLoopConfig.getMaxConcurrency()).thenReturn(maxMessagesInFlight);
+
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA01).thenReturn(sspA0EndOfStream)
.thenAnswer(invocation -> {
// this ensures that the end of stream message has passed through
run loop BEFORE the last remaining in flight message completes
@@ -571,12 +581,12 @@ public class TestRunLoop {
return null;
}).when(task0).drain(any());
+
when(mockRunLoopConfig.getMaxConcurrency()).thenReturn(maxMessagesInFlight);
+
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false,
- 1, runId, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA01).thenReturn(sspA0Drain)
.thenAnswer(invocation -> {
// this ensures that the drain message has passed through run loop
BEFORE the flight message
@@ -606,9 +616,7 @@ public class TestRunLoop {
tasks.put(taskName0, task0);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(sspA0EndOfStream).thenReturn(null);
runLoop.run();
@@ -628,10 +636,7 @@ public class TestRunLoop {
tasks.put(taskName0, task0);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false,
- 1, runId, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(sspA0Drain).thenReturn(null);
runLoop.run();
@@ -692,11 +697,13 @@ public class TestRunLoop {
return null;
}).when(task0).commit();
+
when(mockRunLoopConfig.getMaxConcurrency()).thenReturn(maxMessagesInFlight);
+ when(mockRunLoopConfig.asyncCommitEnabled()).thenReturn(true);
+
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, true);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false)).thenReturn(envelopeA00).thenReturn(envelopeA01).thenReturn(null);
runLoop.run();
@@ -719,9 +726,7 @@ public class TestRunLoop {
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, drainCallbackTimeoutMs, maxThrottlingDelayMs,
maxIdleMs, containerMetrics, () -> 0L, false);
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer,
containerMetrics, () -> 0L, mockRunLoopConfig);
when(consumerMultiplexer.choose(false))
.thenReturn(envelopeA00)