[GitHub] [kafka] vcrfxia commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions
vcrfxia commented on code in PR #13592: URL: https://github.com/apache/kafka/pull/13592#discussion_r1171766477 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static java.time.Duration.ofSeconds; +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +@Timeout(600) +@Tag("integration") +public class KStreamKStreamIntegrationTest { +private final static int NUM_BROKERS = 1; + +public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); +private final static MockTime MOCK_TIME = CLUSTER.time; +private final static String STREAM_1 = "stream1"; +private final static String STREAM_2 = "stream2"; +private final static String OUTPUT = "output-"; +private Properties streamsConfig; +private KafkaStreams streams; +private final static Properties CONSUMER_CONFIG = new Properties(); +private final static Properties PRODUCER_CONFIG = new Properties(); + +@BeforeAll +public static void startCluster() throws Exception { +CLUSTER.start(); +//Use multiple partitions to ensure distribution of keys. + +CLUSTER.createTopic(STREAM_1, 4, 1); +CLUSTER.createTopic(STREAM_2, 4, 1); +CLUSTER.createTopic(OUTPUT, 1, 1); + +CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer"); +CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +} + +@AfterAll +public static void closeCluster() { +CLUSTER.stop(); +} + +@BeforeEach +public void before(final TestInfo testInfo) throws IOException { +final String stateDirBasePath = TestUtils.tempDirectory().getPath(); +final String safeTestName = safeUniqueTestName(getClass(), testInfo); +streamsConfig = getStreamsConfig(safeTestName); +streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath); +} + +@AfterEach +public void after() throws IOException { +if (streams != null) { +streams.close(); +
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions
vcrfxia commented on code in PR #13592: URL: https://github.com/apache/kafka/pull/13592#discussion_r1169351304 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static java.time.Duration.ofSeconds; +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +@Timeout(600) +@Tag("integration") +public class KStreamKStreamIntegrationTest { Review Comment: I'm guessing the reason this is added as a new test file rather than being added into the existing StreamStreamJoinIntegrationTest.java is because the existing test uses the TopologyTestDriver, which doesn't support testing with multiple partitions -- is that true? If that's the case, should we name the new file `KStreamKStreamMultiPartitionIntegrationTest` or similar, in order to better distinguish the two? ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import