This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch repro-task-idling-problem in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 95847cf747c36dd538dddf5b0f9c47d8d92a952b Author: John Roesler <j...@vvcephei.org> AuthorDate: Tue Sep 27 10:46:49 2022 -0500 tmp --- .../KafkaStreamsCloseOptionsIntegrationTest.java | 32 ++++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index 8d3cb8e8795..90fa323e281 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; @@ -152,6 +153,19 @@ public class KafkaStreamsCloseOptionsIntegrationTest { } } + public void tmp() { + StreamsBuilder streamsBuilder = new StreamsBuilder(); + KStream<Object, Object> stream = streamsBuilder.stream("inStream"); + KTable<Object, Object> table = streamsBuilder.table("inTable"); + KStream<Object, Object> result = stream.join(table, (l, r) -> new Object()); + result.to("output"); + Properties properties = new Properties(); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties); + kafkaStreams.start(); + } + @Test public void testCloseOptions() throws Exception { final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); @@ -180,15 +194,15 @@ public class KafkaStreamsCloseOptionsIntegrationTest { private void add10InputElements() { final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"), - KeyValue.pair(1L, "bbb"), - KeyValue.pair(0L, "ccc"), - KeyValue.pair(1L, "ddd"), - KeyValue.pair(0L, "eee"), - KeyValue.pair(1L, "fff"), - KeyValue.pair(0L, "ggg"), - KeyValue.pair(1L, "hhh"), - KeyValue.pair(0L, "iii"), - KeyValue.pair(1L, "jjj")); + KeyValue.pair(1L, "bbb"), + KeyValue.pair(0L, "ccc"), + KeyValue.pair(1L, "ddd"), + KeyValue.pair(0L, "eee"), + KeyValue.pair(1L, "fff"), + KeyValue.pair(0L, "ggg"), + KeyValue.pair(1L, "hhh"), + KeyValue.pair(0L, "iii"), + KeyValue.pair(1L, "jjj")); for (final KeyValue<Long, String> record : records) { mockTime.sleep(10);