lucasbru commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246586612
########## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ########## @@ -113,6 +113,7 @@ public void before(final TestInfo testInfo) { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, false); Review Comment: doesn't the integration test work with state updater? A comment would be good here at least ########## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ########## @@ -208,6 +208,8 @@ private Properties configProps(final String appId, final String host) { streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, false); Review Comment: Same here - why do we disable the state updater here? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1123,7 +1138,10 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); + final Properties props = configProps(true); + props.put(InternalConfig.STATE_UPDATER_ENABLED, false); Review Comment: same here. if possible, a comment why this test is invalid for state updater ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3038,6 +3149,7 @@ public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() { @Test public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() { final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); + streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false); Review Comment: In the corresponding SU test, can we remove the `streamsConfigProps.put`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java: ########## @@ -201,7 +201,7 @@ public boolean commitNeeded() { @Override public StateStore getStore(final String name) { - throw new UnsupportedOperationException("This task is read-only"); + return task.getStore(name); Review Comment: Would be nice to expose a read only state store here (hiding init, flush, close and the like), but that's probably for a differenet PR ########## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ########## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = - asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching + asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)); Review Comment: why did the output data change? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() { return activeTaskStream().collect(Collectors.toList()); } + List<Task> activeRunningTaskIterable() { Review Comment: What does "Running" mean? Is this existing terminology or are you introducing new terms here? What is the difference to "Processing" tasks, and didn't we also have "Owned" tasks somewhere? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); - final boolean stateUpdaterEnabled = - InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); + final boolean stateUpdaterEnabled = InternalConfig.getBoolean( + config.originals(), + InternalConfig.STATE_UPDATER_ENABLED, + InternalConfig.STATE_UPDATER_ENABLED_DEFAULT + ); Review Comment: could make sense to make a little `InternalConfig.getStateUpdateEnabled(config)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org