[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r536318737 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -845,67 +856,118 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, time, globalThreadId, delegatingStateRestoreListener, -this::defaultStreamsUncaughtExceptionHandler +streamsUncaughtExceptionHandler ); globalThreadState = globalStreamThread.state(); } // use client id instead of thread client id since this admin client may be shared among threads adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); -final Map threadState = new HashMap<>(numStreamThreads); -final ArrayList storeProviders = new ArrayList<>(); -for (int i = 0; i < numStreamThreads; i++) { -final StreamThread streamThread = StreamThread.create( -internalTopologyBuilder, -config, -clientSupplier, -adminClient, -processId, -clientId, -streamsMetrics, -time, -streamsMetadataState, -cacheSizePerThread, -stateDirectory, -delegatingStateRestoreListener, -i + 1, -KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} - -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); - -final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); +threadState = new HashMap<>(numStreamThreads); +storeProviders = new ArrayList<>(); +streamStateListener = new StreamStateListener(threadState, globalThreadState); if (hasGlobalTopology) { globalStreamThread.setStateListener(streamStateListener); } -for (final StreamThread thread : threads) { -thread.setStateListener(streamStateListener); +for (int i = 1; i <= numStreamThreads; i++) { +createStreamThread(cacheSizePerThread, i); } +ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> +Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); + final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores()); queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + *
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535503453 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { Review comment: Expanding on this, the problem in the shutdown thread. When the join only waits for alive threads, and to be alive the thread needs to be started. So if in between the check and the start thread another thread transitions the state to NOT_RUNNING the thread will not join in the shutdown thread. Then when it continues it will start as it passed the check and we will have a thread running after the client is shutdown. This would be extremely though race condition to find or reproduce so best to just avoid it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535477229 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +streamThread.shutdown(); +threads.remove(streamThread); + resizeThreadCache(getCacheSizePerThread(threads.size())); +return Optional.empty(); +} +} +} +} +return Optional.empty(); +} -final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); -if (hasGlobalTopology) { -globalStreamThread.setStateListener(streamStateListener); +private int getNextThreadIndex() { +final HashSet names = new HashSet<>(); +for (final StreamThread streamThread: threads) { +names.add(streamThread.getName()); } -for (final StreamThread thread : threads) { -thread.setStateListener(streamStateListener); +final String baseName = clientId + "-StreamThread-"; +for (int i = 0; i < threads.size(); i++) { Review comment: Sure, I missed that suggestion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535471146 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler Review comment: ah good catch. the diff makes that hard to see as it was actually moved to a new method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535468574 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { Review comment: Well newThread only syncs the addThread method. There is still the race condition between the second check of is running and starting the thread. It seems like a bad idea to leave that open as it could cause thread state changes when there shouldn't be. Starting the thread is relatively low cost so this shouldn't have much impact perf wise. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r534339275 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +threads.remove(streamThread); Review comment: Oh okay, when I shutdown unstarted threads in a test I got a `java.lang.IllegalStateException Unexpected state transition`. But it looks like that is the client. I added a new lock for the add threads, and shutdown the thread. I think this address the problem you found with concurrent resizes. As well as @ableegoldman 's concerns This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r534313845 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +threads.remove(streamThread); Review comment: Unfortunately I don't think we can shutdown a thread until we have started it. I don't think there should be a dead lock by just using the state lock around most of the method. It as mostly about cost. However I think that its is probably the safest way as it solves our problem about needing to remove a thread we have just created and we won't potentially waste time resizing the cache as such. If we synchronize on a new lock we end up with all the problems we were trying to solve earlier anyways with the possible state changes and having to clean up un-started threads This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533707640 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: I think when a state transition is changed or add is when these changes should be made. Removing from the thread list is low cost as is increasing the size of the cache, so it won't be expensive to make these changes for all cases. I think the two good options we have is that we can move the cache resize and create thread into the stateLock or we can undo the changes we made if we have to abort starting the new thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533676976 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: From running or rebalancing aren't those the only states we can get to? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533656732 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: There will be two more cases of remove. In the replace thread option and in the remove thread option. I'm not really convinced it is necessary but I don't see a problem with re-resizing the cache if we do not start the thread. I don't think there will be any side affects as the client should be shutting down, but if we resize there should be a little extra info in the state and store providers but it would not get used ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) { final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes; maxCacheSizeBytes = newCacheSizeBytes; if (shrink) { +if (caches.values().isEmpty()) { Review comment: yeah, I didn't realize this was a problem, but when I added more test coverage it showed up ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533574172 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, time, globalThreadId, delegatingStateRestoreListener, -this::defaultStreamsUncaughtExceptionHandler +streamsUncaughtExceptionHandler ); globalThreadState = globalStreamThread.state(); } // use client id instead of thread client id since this admin client may be shared among threads adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); -final Map threadState = new HashMap<>(numStreamThreads); -final ArrayList storeProviders = new ArrayList<>(); +threadState = new HashMap<>(numStreamThreads); +storeProviders = new ArrayList<>(); for (int i = 0; i < numStreamThreads; i++) { -final StreamThread streamThread = StreamThread.create( -internalTopologyBuilder, -config, -clientSupplier, -adminClient, -processId, -clientId, -streamsMetrics, -time, -streamsMetadataState, -cacheSizePerThread, -stateDirectory, -delegatingStateRestoreListener, -i + 1, -KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +createStreamThread(cacheSizePerThread, i + 1); Review comment: I think so ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler st final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); synchronized (stateLock) { if (state == State.CREATED) { +this.streamsUncaughtExceptionHandler = handler; Review comment: the `this.` is necessary. the parameter is the same name ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) { final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes; maxCacheSizeBytes = newCacheSizeBytes; if (shrink) { +if (caches.values().isEmpty()) { Review comment: Apparently `CircularIterators` throw an error if they are made on empty lists. And if there are no caches to resize we don't need to bother with the rest of this. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r529716169 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class AdjustStreamThreadCountTest { + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +private static String inputTopic; +private static StreamsBuilder builder; +private static Properties properties; +private static String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); +builder.stream(inputTopic); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2), +mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class) +) +); +} + +@After +public void teardown() throws IOException { +purgeLocalStreamsState(properties); +} + +private void produceMessages(final long timestamp, final String streamOneInput, final String msg) { +IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( +streamOneInput, +Collections.singletonList(new KeyValue<>("1", msg)), +TestUtils.producerConfig( +CLUSTER.bootstrapServers(), +StringSerializer.class, +StringSerializer.class, +new Properties()), +timestamp); +} + +@Test +public void shouldAddStreamThread() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); + +
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r529713764 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +} else { +return Optional.empty(); +} +} +return Optional.of(streamThread.getName()); Review comment: sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r529709595 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category(IntegrationTest.class) +public class AdjustStreamThreadCountTest { + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + +@Rule +public TestName testName = new TestName(); + +private static String inputTopic; +private static StreamsBuilder builder; +private static Properties properties; +private static String appId = ""; + +@Before +public void setup() { +final String testId = safeUniqueTestName(getClass(), testName); +appId = "appId_" + testId; +inputTopic = "input" + testId; +IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); + +builder = new StreamsBuilder(); +builder.stream(inputTopic); + +properties = mkObjectProperties( +mkMap( +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), +mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2), +mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class) +) +); +} + +@After +public void teardown() throws IOException { +purgeLocalStreamsState(properties); +} + +private void produceMessages(final long timestamp, final String streamOneInput, final String msg) { +IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( +streamOneInput, +Collections.singletonList(new KeyValue<>("1", msg)), +TestUtils.producerConfig( +CLUSTER.bootstrapServers(), +StringSerializer.class, +StringSerializer.class, +new Properties()), +timestamp); +} + +@Test +public void shouldAddStreamThread() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); + +
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527857267 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { Review comment: @cadonna added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527836277 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { Review comment: Okay we can check then only synchronize around the start of the thread to make sure it doesn't shutdown between the check and the starting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527277048 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { Review comment: Well we don't want it changing state while adding a thread This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527277135 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { Review comment: you are right, Ill add one This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527099779 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; +final long cacheSizePerThread = getCacheSizePerThread(threadIdx); +resizeThreadCache(threadIdx); +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +streamThread.setStateListener(streamStateListener); +return Optional.of(streamThread.getName()); +} else { +return Optional.empty(); +} +} +} Review comment: right before the positive return :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527099742 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; Review comment: Looks like I didn't understand threadIdx. that makes sense now ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; +final long cacheSizePerThread = getCacheSizePerThread(threadIdx); +resizeThreadCache(threadIdx); +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +streamThread.setStateListener(streamStateListener); +return Optional.of(streamThread.getName()); +} else { +return Optional.empty(); +} +} +} Review comment: right before the positive return This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org