[GitHub] [kafka] vcrfxia commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-19 Thread via GitHub


vcrfxia commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1171766477


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+private final static int NUM_BROKERS = 1;
+
+public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+private final static MockTime MOCK_TIME = CLUSTER.time;
+private final static String STREAM_1 = "stream1";
+private final static String STREAM_2 = "stream2";
+private final static String OUTPUT = "output-";
+private Properties streamsConfig;
+private KafkaStreams streams;
+private final static Properties CONSUMER_CONFIG = new Properties();
+private final static Properties PRODUCER_CONFIG = new Properties();
+
+@BeforeAll
+public static void startCluster() throws Exception {
+CLUSTER.start();
+//Use multiple partitions to ensure distribution of keys.
+
+CLUSTER.createTopic(STREAM_1, 4, 1);
+CLUSTER.createTopic(STREAM_2, 4, 1);
+CLUSTER.createTopic(OUTPUT, 1, 1);
+
+CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+}
+
+@AfterAll
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@BeforeEach
+public void before(final TestInfo testInfo) throws IOException {
+final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+streamsConfig = getStreamsConfig(safeTestName);
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+}
+
+@AfterEach
+public void after() throws IOException {
+if (streams != null) {
+streams.close();
+ 

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13592: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions

2023-04-17 Thread via GitHub


vcrfxia commented on code in PR #13592:
URL: https://github.com/apache/kafka/pull/13592#discussion_r1169351304


##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {

Review Comment:
   I'm guessing the reason this is added as a new test file rather than being 
added into the existing StreamStreamJoinIntegrationTest.java is because the 
existing test uses the TopologyTestDriver, which doesn't support testing with 
multiple partitions -- is that true?
   
   If that's the case, should we name the new file 
`KStreamKStreamMultiPartitionIntegrationTest` or similar, in order to better 
distinguish the two?



##
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java:
##
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import