lucasbru commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246586612


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -113,6 +113,7 @@ public void before(final TestInfo testInfo) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+        
streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, 
false);

Review Comment:
   doesn't the integration test work with state updater? A comment would be 
good here at least



##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -208,6 +208,8 @@ private Properties configProps(final String appId, final 
String host) {
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 
* 1000);
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        
streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, 
false);

Review Comment:
   Same here - why do we disable the state updater here?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1123,7 +1138,10 @@ public void 
shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
     public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws 
InterruptedException {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
-        final StreamThread thread = createStreamThread(CLIENT_ID, new 
StreamsConfig(configProps(true)), true);
+        final Properties props = configProps(true);
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   same here. if possible, a comment why this test is invalid for state updater



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3038,6 +3149,7 @@ public void 
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
     @Test
     public void 
shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
         final Properties streamsConfigProps = 
StreamsTestUtils.getStreamsConfig();
+        streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   In the corresponding SU test, can we remove the `streamsConfigProps.put`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -201,7 +201,7 @@ public boolean commitNeeded() {
 
     @Override
     public StateStore getStore(final String name) {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.getStore(name);

Review Comment:
   Would be nice to expose a read only state store here (hiding init, flush, 
close and the like), but that's probably for a differenet PR



##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching
+        asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), 
pair("C", 2L));

Review Comment:
   why did the output data change?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {

Review Comment:
   What does "Running" mean? Is this existing terminology or are you 
introducing new terms here? What is the difference to "Processing" tasks, and 
didn't we also have "Owned" tasks somewhere?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
-        final boolean stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+        final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+                config.originals(),
+                InternalConfig.STATE_UPDATER_ENABLED,
+                InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+        );

Review Comment:
   could make sense to make a little 
`InternalConfig.getStateUpdateEnabled(config)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to