[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-04 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r536318737



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -845,67 +856,118 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 time,
 globalThreadId,
 delegatingStateRestoreListener,
-this::defaultStreamsUncaughtExceptionHandler
+streamsUncaughtExceptionHandler
 );
 globalThreadState = globalStreamThread.state();
 }
 
 // use client id instead of thread client id since this admin client 
may be shared among threads
 adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-final Map threadState = new 
HashMap<>(numStreamThreads);
-final ArrayList storeProviders = new 
ArrayList<>();
-for (int i = 0; i < numStreamThreads; i++) {
-final StreamThread streamThread = StreamThread.create(
-internalTopologyBuilder,
-config,
-clientSupplier,
-adminClient,
-processId,
-clientId,
-streamsMetrics,
-time,
-streamsMetadataState,
-cacheSizePerThread,
-stateDirectory,
-delegatingStateRestoreListener,
-i + 1,
-KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
-
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
-
-final StreamStateListener streamStateListener = new 
StreamStateListener(threadState, globalThreadState);
+threadState = new HashMap<>(numStreamThreads);
+storeProviders = new ArrayList<>();
+streamStateListener = new StreamStateListener(threadState, 
globalThreadState);
 if (hasGlobalTopology) {
 globalStreamThread.setStateListener(streamStateListener);
 }
-for (final StreamThread thread : threads) {
-thread.setStateListener(streamStateListener);
+for (int i = 1; i <= numStreamThreads; i++) {
+createStreamThread(cacheSizePerThread, i);
 }
 
+ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
+Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+
 final GlobalStateStoreProvider globalStateStoreProvider = new 
GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535503453



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {

Review comment:
   Expanding on this, the problem in the shutdown thread. When the join 
only waits for alive threads, and to be alive the thread needs to be started. 
   
   So if in between the check and the start thread another thread transitions 
the state to NOT_RUNNING the thread will not join in the shutdown thread. Then 
when it continues it will start as it passed the check and we will have a 
thread running after the client is shutdown.
   
   This would be extremely though race condition to find or reproduce so best 
to just avoid it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535477229



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+streamThread.shutdown();
+threads.remove(streamThread);
+
resizeThreadCache(getCacheSizePerThread(threads.size()));
+return Optional.empty();
+}
+}
+}
+}
+return Optional.empty();
+}
 
-final StreamStateListener streamStateListener = new 
StreamStateListener(threadState, globalThreadState);
-if (hasGlobalTopology) {
-globalStreamThread.setStateListener(streamStateListener);
+private int getNextThreadIndex() {
+final HashSet names = new HashSet<>();
+for (final StreamThread streamThread: threads) {
+names.add(streamThread.getName());
 }
-for (final StreamThread thread : threads) {
-thread.setStateListener(streamStateListener);
+final String baseName = clientId + "-StreamThread-";
+for (int i = 0; i < threads.size(); i++) {

Review comment:
   Sure, I missed that suggestion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535471146



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler

Review comment:
   ah good catch. the diff makes that hard to see as it was actually moved 
to a new method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535468574



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {

Review comment:
   Well newThread only syncs the addThread method. There is still the race 
condition between the second check of is running and starting the thread. It 
seems like a bad idea to leave that open as it could cause thread state changes 
when there shouldn't be. Starting the thread is relatively low cost so this 
shouldn't have much impact perf wise.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-02 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534339275



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+threads.remove(streamThread);

Review comment:
   Oh okay, when I shutdown unstarted threads in a test I got a 
`java.lang.IllegalStateException Unexpected state transition`. But it looks 
like that is the client.
   
   I added a new lock for the add threads, and shutdown the thread. I think 
this address the problem you found with concurrent resizes. As well as 
@ableegoldman 's concerns





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-02 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534313845



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+threads.remove(streamThread);

Review comment:
   Unfortunately I don't think we can shutdown a thread until we have 
started it.
   
I don't think there should be a dead lock by just using the state lock 
around most of the method. It as mostly about cost. However I think that its is 
probably the safest way as it solves our problem about needing to remove a 
thread we have just created and we won't potentially waste time resizing the 
cache as such.
   
   If we synchronize on a new lock we end up with all the problems we were 
trying to solve earlier anyways with the possible state changes and having to 
clean up un-started threads





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533707640



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   I think when a state transition is changed or add is when these changes 
should be made. Removing from the thread list is low cost as is increasing the 
size of the cache, so it won't be expensive to make these changes for all cases.
   
   I think the two good options we have is that we can move the cache resize 
and create thread into the stateLock or we can undo the changes we made if we 
have to abort starting the new thread. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533676976



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   From running or rebalancing aren't those the only states we can get to?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533656732



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   There will be two more cases of remove. In the replace thread option and 
in the remove thread option.
   
   I'm not really convinced it is necessary but I don't see a problem with 
re-resizing the cache if we do not start the thread. I don't think there will 
be any side affects as the client should be shutting down, but if we resize 
there should be a little extra info in the state and store providers but it 
would not get used

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
 final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
 maxCacheSizeBytes = newCacheSizeBytes;
 if (shrink) {
+if (caches.values().isEmpty()) {

Review comment:
   yeah, I didn't realize this was a problem, but when I added more test 
coverage it showed up

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533574172



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 time,
 globalThreadId,
 delegatingStateRestoreListener,
-this::defaultStreamsUncaughtExceptionHandler
+streamsUncaughtExceptionHandler
 );
 globalThreadState = globalStreamThread.state();
 }
 
 // use client id instead of thread client id since this admin client 
may be shared among threads
 adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-final Map threadState = new 
HashMap<>(numStreamThreads);
-final ArrayList storeProviders = new 
ArrayList<>();
+threadState = new HashMap<>(numStreamThreads);
+storeProviders = new ArrayList<>();
 for (int i = 0; i < numStreamThreads; i++) {
-final StreamThread streamThread = StreamThread.create(
-internalTopologyBuilder,
-config,
-clientSupplier,
-adminClient,
-processId,
-clientId,
-streamsMetrics,
-time,
-streamsMetadataState,
-cacheSizePerThread,
-stateDirectory,
-delegatingStateRestoreListener,
-i + 1,
-KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+createStreamThread(cacheSizePerThread, i + 1);

Review comment:
   I think so

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler st
 final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
 synchronized (stateLock) {
 if (state == State.CREATED) {
+this.streamsUncaughtExceptionHandler = handler;

Review comment:
   the `this.` is necessary. the parameter is the same name

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
 final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
 maxCacheSizeBytes = newCacheSizeBytes;
 if (shrink) {
+if (caches.values().isEmpty()) {

Review comment:
   Apparently `CircularIterators` throw an error if they are made on empty 
lists. And if there are no caches to resize we don't need to bother with the 
rest of this.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-24 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529716169



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+private static String inputTopic;
+private static StreamsBuilder builder;
+private static Properties properties;
+private static String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+builder.stream(inputTopic);
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class)
+)
+);
+}
+
+@After
+public void teardown() throws IOException {
+purgeLocalStreamsState(properties);
+}
+
+private void produceMessages(final long timestamp, final String 
streamOneInput, final String msg) {
+IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+streamOneInput,
+Collections.singletonList(new KeyValue<>("1", msg)),
+TestUtils.producerConfig(
+CLUSTER.bootstrapServers(),
+StringSerializer.class,
+StringSerializer.class,
+new Properties()),
+timestamp);
+}
+
+@Test
+public void shouldAddStreamThread() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+
+

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-24 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529713764



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+} else {
+return Optional.empty();
+}
+}
+return Optional.of(streamThread.getName());

Review comment:
   sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-24 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529709595



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+@Rule
+public TestName testName = new TestName();
+
+private static String inputTopic;
+private static StreamsBuilder builder;
+private static Properties properties;
+private static String appId = "";
+
+@Before
+public void setup() {
+final String testId = safeUniqueTestName(getClass(), testName);
+appId = "appId_" + testId;
+inputTopic = "input" + testId;
+IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+builder  = new StreamsBuilder();
+builder.stream(inputTopic);
+
+properties  = mkObjectProperties(
+mkMap(
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class),
+
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class)
+)
+);
+}
+
+@After
+public void teardown() throws IOException {
+purgeLocalStreamsState(properties);
+}
+
+private void produceMessages(final long timestamp, final String 
streamOneInput, final String msg) {
+IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+streamOneInput,
+Collections.singletonList(new KeyValue<>("1", msg)),
+TestUtils.producerConfig(
+CLUSTER.bootstrapServers(),
+StringSerializer.class,
+StringSerializer.class,
+new Properties()),
+timestamp);
+}
+
+@Test
+public void shouldAddStreamThread() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+
+

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-20 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527857267



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {

Review comment:
   @cadonna added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-20 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527836277



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {

Review comment:
   Okay we can check then only synchronize around the start of the thread 
to make sure it doesn't shutdown between the check and the starting 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527277048



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {

Review comment:
   Well we don't want it changing state while adding a thread





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527277135



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {

Review comment:
   you are right, Ill add one





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527099779



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;
+final long cacheSizePerThread = 
getCacheSizePerThread(threadIdx);
+resizeThreadCache(threadIdx);
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+streamThread.setStateListener(streamStateListener);
+return Optional.of(streamThread.getName());
+} else {
+return Optional.empty();
+}
+}
+}

Review comment:
   right before the positive return :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527099742



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;

Review comment:
   Looks like I didn't understand threadIdx. that makes sense now

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;
+final long cacheSizePerThread = 
getCacheSizePerThread(threadIdx);
+resizeThreadCache(threadIdx);
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+streamThread.setStateListener(streamStateListener);
+return Optional.of(streamThread.getName());
+} else {
+return Optional.empty();
+}
+}
+}

Review comment:
   right before the positive return





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org