ableegoldman commented on a change in pull request #11812:
URL: https://github.com/apache/kafka/pull/11812#discussion_r816342899



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -775,6 +781,70 @@ public void shouldWaitForMissingInputTopicsToBeCreated() 
throws Exception {
         }
     }
 
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws 
Exception {
+        final AtomicInteger noOutputExpected = new AtomicInteger(0);
+        final AtomicInteger outputExpected = new AtomicInteger(0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+
+        streams = new KafkaStreamsNamedTopologyWrapper(props);
+        streams.setUncaughtExceptionHandler(exception -> 
StreamThreadExceptionResponse.REPLACE_THREAD);
+
+        final NamedTopologyBuilder builder = 
streams.newNamedTopologyBuilder("topology_A");
+        builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> 
outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1);

Review comment:
       Ah yeah I meant to add a second integration test variant that's 
`FromDifferentTopologies` instead of `WithinSameTopology` -- but I'm still 
working on it, so I didn't want to block this PR on it necessarily. But short 
answer is "yes", definitely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to