This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9ece7fe KAFKA-10500: Allow people to add new StreamThread at runtime
(#9615)
9ece7fe is described below
commit 9ece7fe372230e848920d843af2d0d5c2fbaac4d
Author: Walker Carlson <[email protected]>
AuthorDate: Fri Dec 4 11:21:03 2020 -0800
KAFKA-10500: Allow people to add new StreamThread at runtime (#9615)
Part of KIP-663.
Reviewers: Bruno Cadonna <[email protected]>, A. Sophie Blee-Goldman
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 146 +++++++++++++++------
.../kafka/streams/state/internals/ThreadCache.java | 3 +
.../org/apache/kafka/streams/KafkaStreamsTest.java | 44 +++++++
.../integration/AdjustStreamThreadCountTest.java | 111 ++++++++++++++++
4 files changed, 262 insertions(+), 42 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 44ce307..4fb2b2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -84,6 +84,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
@@ -161,11 +162,20 @@ public class KafkaStreams implements AutoCloseable {
private final ProcessorTopology taskTopology;
private final ProcessorTopology globalTaskTopology;
private final long totalCacheSize;
+ private final StreamStateListener streamStateListener;
+ private final StateRestoreListener delegatingStateRestoreListener;
+ private final Map<Long, StreamThread.State> threadState;
+ private final ArrayList<StreamThreadStateStoreProvider> storeProviders;
+ private final UUID processId;
+ private final KafkaClientSupplier clientSupplier;
+ private final InternalTopologyBuilder internalTopologyBuilder;
GlobalStreamThread globalStreamThread;
private KafkaStreams.StateListener stateListener;
private StateRestoreListener globalStateRestoreListener;
private boolean oldHandler;
+ private java.util.function.Consumer<Throwable>
streamsUncaughtExceptionHandler;
+ private final Object changeThreadCount = new Object();
// container states
/**
@@ -397,6 +407,7 @@ public class KafkaStreams implements AutoCloseable {
final Consumer<Throwable> handler = exception ->
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
synchronized (stateLock) {
if (state == State.CREATED) {
+ this.streamsUncaughtExceptionHandler = handler;
Objects.requireNonNull(streamsUncaughtExceptionHandler);
for (final StreamThread thread : threads) {
thread.setStreamsUncaughtExceptionHandler(handler);
@@ -755,7 +766,7 @@ public class KafkaStreams implements AutoCloseable {
this.config = config;
this.time = time;
// The application ID is a required config and hence should always
have value
- final UUID processId = UUID.randomUUID();
+ processId = UUID.randomUUID();
final String userClientId =
config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId =
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) {
@@ -765,7 +776,7 @@ public class KafkaStreams implements AutoCloseable {
}
final LogContext logContext = new
LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
-
+ this.clientSupplier = clientSupplier;
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -792,7 +803,7 @@ public class KafkaStreams implements AutoCloseable {
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) ->
state);
log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
-
+ this.internalTopologyBuilder = internalTopologyBuilder;
// re-write the physical topology according to the config
internalTopologyBuilder.rewriteTopology(config);
@@ -820,18 +831,18 @@ public class KafkaStreams implements AutoCloseable {
throw new TopologyException("Topology has no stream threads and no
global threads, " +
"must subscribe to at least one source topic or global
table.");
}
-
+ oldHandler = false;
totalCacheSize =
config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
final long cacheSizePerThread =
getCacheSizePerThread(numStreamThreads);
final boolean hasPersistentStores =
taskTopology.hasPersistentLocalStore() ||
(hasGlobalTopology &&
globalTaskTopology.hasPersistentGlobalStore());
-
+ streamsUncaughtExceptionHandler =
this::defaultStreamsUncaughtExceptionHandler;
try {
stateDirectory = new StateDirectory(config, time,
hasPersistentStores);
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}
- final StateRestoreListener delegatingStateRestoreListener = new
DelegatingStateRestoreListener();
+ delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
if (hasGlobalTopology) {
final String globalThreadId = clientId + "-GlobalStreamThread";
@@ -845,7 +856,7 @@ public class KafkaStreams implements AutoCloseable {
time,
globalThreadId,
delegatingStateRestoreListener,
- this::defaultStreamsUncaughtExceptionHandler
+ streamsUncaughtExceptionHandler
);
globalThreadState = globalStreamThread.state();
}
@@ -853,59 +864,110 @@ public class KafkaStreams implements AutoCloseable {
// use client id instead of thread client id since this admin client
may be shared among threads
adminClient =
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
- final Map<Long, StreamThread.State> threadState = new
HashMap<>(numStreamThreads);
- final ArrayList<StreamThreadStateStoreProvider> storeProviders = new
ArrayList<>();
- for (int i = 0; i < numStreamThreads; i++) {
- final StreamThread streamThread = StreamThread.create(
- internalTopologyBuilder,
- config,
- clientSupplier,
- adminClient,
- processId,
- clientId,
- streamsMetrics,
- time,
- streamsMetadataState,
- cacheSizePerThread,
- stateDirectory,
- delegatingStateRestoreListener,
- i + 1,
- KafkaStreams.this::closeToError,
- this::defaultStreamsUncaughtExceptionHandler
- );
- threads.add(streamThread);
- threadState.put(streamThread.getId(), streamThread.state());
- storeProviders.add(new
StreamThreadStateStoreProvider(streamThread));
- }
-
- ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) ->
- Math.toIntExact(threads.stream().filter(thread ->
thread.state().isAlive()).count()));
-
- final StreamStateListener streamStateListener = new
StreamStateListener(threadState, globalThreadState);
+ threadState = new HashMap<>(numStreamThreads);
+ storeProviders = new ArrayList<>();
+ streamStateListener = new StreamStateListener(threadState,
globalThreadState);
if (hasGlobalTopology) {
globalStreamThread.setStateListener(streamStateListener);
}
- for (final StreamThread thread : threads) {
- thread.setStateListener(streamStateListener);
+ for (int i = 1; i <= numStreamThreads; i++) {
+ createStreamThread(cacheSizePerThread, i);
}
+ ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics,
(metricsConfig, now) ->
+ Math.toIntExact(threads.stream().filter(thread ->
thread.state().isAlive()).count()));
+
final GlobalStateStoreProvider globalStateStoreProvider = new
GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders,
globalStateStoreProvider);
stateDirCleaner = setupStateDirCleaner();
- oldHandler = false;
maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
rocksDBMetricsRecordingService =
maybeCreateRocksDBMetricsRecordingService(clientId, config);
}
+ private StreamThread createStreamThread(final long cacheSizePerThread,
final int threadIdx) {
+ final StreamThread streamThread = StreamThread.create(
+ internalTopologyBuilder,
+ config,
+ clientSupplier,
+ adminClient,
+ processId,
+ clientId,
+ streamsMetrics,
+ time,
+ streamsMetadataState,
+ cacheSizePerThread,
+ stateDirectory,
+ delegatingStateRestoreListener,
+ threadIdx,
+ KafkaStreams.this::closeToError,
+ streamsUncaughtExceptionHandler
+ );
+ streamThread.setStateListener(streamStateListener);
+ threads.add(streamThread);
+ threadState.put(streamThread.getId(), streamThread.state());
+ storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+ return streamThread;
+ }
+
+ /**
+ * Adds and starts a stream thread in addition to the stream threads that
are already running in this
+ * Kafka Streams client.
+ * <p>
+ * Since the number of stream threads increases, the sizes of the caches
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * <p>
+ * Stream threads can only be added if this Kafka Streams client is in
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread
could not be added
+ */
+ public Optional<String> addStreamThread() {
+ synchronized (changeThreadCount) {
+ if (isRunningOrRebalancing()) {
+ final int threadIdx = getNextThreadIndex();
+ final long cacheSizePerThread =
getCacheSizePerThread(threads.size() + 1);
+ resizeThreadCache(cacheSizePerThread);
+ final StreamThread streamThread =
createStreamThread(cacheSizePerThread, threadIdx);
+ synchronized (stateLock) {
+ if (isRunningOrRebalancing()) {
+ streamThread.start();
+ return Optional.of(streamThread.getName());
+ } else {
+ streamThread.shutdown();
+ threads.remove(streamThread);
+
resizeThreadCache(getCacheSizePerThread(threads.size()));
+ return Optional.empty();
+ }
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ private int getNextThreadIndex() {
+ final HashSet<String> names = new HashSet<>();
+ for (final StreamThread streamThread: threads) {
+ names.add(streamThread.getName());
+ }
+ final String baseName = clientId + "-StreamThread-";
+ for (int i = 1; i <= threads.size(); i++) {
+ final String name = baseName + i;
+ if (!names.contains(name)) {
+ return i;
+ }
+ }
+ return threads.size() + 1;
+ }
+
private long getCacheSizePerThread(final int numStreamThreads) {
return totalCacheSize / (numStreamThreads + ((globalTaskTopology !=
null) ? 1 : 0));
}
- private void resizeThreadCache(final int numStreamThreads) {
- final long cacheSizePreThread =
getCacheSizePerThread(numStreamThreads);
+ private void resizeThreadCache(final long cacheSizePerThread) {
for (final StreamThread streamThread: threads) {
- streamThread.resizeCache(cacheSizePreThread);
+ streamThread.resizeCache(cacheSizePerThread);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index af3d767..9e99d3b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -76,6 +76,9 @@ public class ThreadCache {
final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
maxCacheSizeBytes = newCacheSizeBytes;
if (shrink) {
+ if (caches.values().isEmpty()) {
+ return;
+ }
final CircularIterator<NamedCache> circularIterator = new
CircularIterator<>(caches.values());
while (sizeBytes() > maxCacheSizeBytes) {
final NamedCache cache = circularIterator.next();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b2e3971..20de104 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -78,6 +78,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
@@ -312,6 +313,9 @@ public class KafkaStreamsTest {
StreamThread.State.PARTITIONS_ASSIGNED);
return null;
}).anyTimes();
+ thread.resizeCache(EasyMock.anyLong());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(thread.getName()).andStubReturn("newThread");
thread.shutdown();
EasyMock.expectLastCall().andAnswer(() -> {
supplier.consumer.close();
@@ -589,6 +593,46 @@ public class KafkaStreamsTest {
}
@Test
+ public void shouldAddThreadWhenRunning() throws InterruptedException {
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+ final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+ streams.start();
+ final int oldSize = streams.threads.size();
+ TestUtils.waitForCondition(() -> streams.state() ==
KafkaStreams.State.RUNNING, 15L, "wait until running");
+ assertThat(streams.addStreamThread(),
equalTo(Optional.of("newThread")));
+ assertThat(streams.threads.size(), equalTo(oldSize + 1));
+ }
+
+ @Test
+ public void shouldNotAddThreadWhenCreated() {
+ final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+ final int oldSize = streams.threads.size();
+ assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+ assertThat(streams.threads.size(), equalTo(oldSize));
+ }
+
+ @Test
+ public void shouldNotAddThreadWhenClosed() {
+ final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+ final int oldSize = streams.threads.size();
+ streams.close();
+ assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+ assertThat(streams.threads.size(), equalTo(oldSize));
+ }
+
+ @Test
+ public void shouldNotAddThreadWhenError() {
+ final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+ final int oldSize = streams.threads.size();
+ streams.start();
+ streamThreadOne.shutdown();
+ streamThreadTwo.shutdown();
+ assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+ assertThat(streams.threads.size(), equalTo(oldSize));
+ }
+
+
+ @Test
public void testCannotStartOnceClosed() {
final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
new file mode 100644
index 0000000..649fc86
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private static String inputTopic;
+ private static StreamsBuilder builder;
+ private static Properties properties;
+ private static String appId = "";
+
+ @Before
+ public void setup() {
+ final String testId = safeUniqueTestName(getClass(), testName);
+ appId = "appId_" + testId;
+ inputTopic = "input" + testId;
+ IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+ builder = new StreamsBuilder();
+ builder.stream(inputTopic);
+
+ properties = mkObjectProperties(
+ mkMap(
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath()),
+ mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+ mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class),
+ mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class)
+ )
+ );
+ }
+
+ @After
+ public void teardown() throws IOException {
+ purgeLocalStreamsState(properties);
+ }
+
+ @Test
+ public void shouldAddStreamThread() throws Exception {
+ try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
+
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+ final int oldThreadCount =
kafkaStreams.localThreadsMetadata().size();
+ assertThat(kafkaStreams.localThreadsMetadata().stream().map(t ->
t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new
String[] {"1", "2"}));
+
+ final Optional<String> name = kafkaStreams.addStreamThread();
+
+ assertThat(name, CoreMatchers.not(Optional.empty()));
+ TestUtils.waitForCondition(
+ () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+ .map(ThreadMetadata::threadName).anyMatch(t ->
t.equals(name.orElse(""))),
+ "Wait for the thread to be added"
+ );
+ assertThat(kafkaStreams.localThreadsMetadata().size(),
equalTo(oldThreadCount + 1));
+ assertThat(kafkaStreams.localThreadsMetadata().stream().map(t ->
t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new
String[] {"1", "2", "3"}));
+ TestUtils.waitForCondition(() -> kafkaStreams.state() ==
KafkaStreams.State.RUNNING, "wait for running");
+ }
+ }
+}