[
https://issues.apache.org/jira/browse/KAFKA-6256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310117#comment-16310117
]
ASF GitHub Bot commented on KAFKA-6256:
---------------------------------------
guozhangwang closed pull request #4364: KAFKA-6256: fix flaky test
KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
URL: https://github.com/apache/kafka/pull/4364
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 8d4299bb1af..f90dc4bc174 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -33,13 +32,16 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -49,8 +51,11 @@
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -62,7 +67,7 @@
@Category({IntegrationTest.class})
public class KStreamKTableJoinIntegrationTest {
private static final int NUM_BROKERS = 1;
- private static final long COMMIT_INTERVAL_MS = 300L;
+ private static final long COMMIT_INTERVAL_MS = 1000L;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
@@ -90,11 +95,8 @@ public void before() throws InterruptedException {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
- TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE,
true);
-
-
}
@After
@@ -185,6 +187,11 @@ private void countClicksPerRegion(final int
cacheSizeBytes) throws Exception {
new KeyValue<>("europe", 109L),
new KeyValue<>("asia", 124L)
);
+ final Map<String, Long> remainingExpectedResult = new HashMap<>();
+ for (final KeyValue<String, Long> record : expectedClicksPerRegion) {
+ remainingExpectedResult.put(record.key, record.value);
+ }
+ final AtomicInteger outputCounter = new AtomicInteger();
//
// Step 1: Configure and start the processor topology.
@@ -251,10 +258,23 @@ public RegionWithClicks apply(final Long clicks, final
String region) {
public Long apply(final Long value1, final Long value2) {
return value1 + value2;
}
- }, "ClicksPerRegionUnwindowed");
+ });
// Write the (continuously updating) results to the output topic.
- clicksPerRegion.to(stringSerde, longSerde, outputTopic);
+ clicksPerRegion
+ .toStream()
+ .peek(new ForeachAction<String, Long>() {
+ @Override
+ public void apply(final String key, final Long value) {
+ outputCounter.incrementAndGet();
+
+ final long finalValue = remainingExpectedResult.get(key);
+ if (value == finalValue) {
+ remainingExpectedResult.remove(key);
+ }
+ }
+ })
+ .to(outputTopic, Produced.with(stringSerde, longSerde));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
@@ -273,7 +293,6 @@ public Long apply(final Long value1, final Long value2) {
userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
userRegions, userRegionsProducerConfig, mockTime);
-
//
// Step 3: Publish some user click events.
//
@@ -295,10 +314,26 @@ public Long apply(final Long value1, final Long value2) {
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class);
- final List<KeyValue<String, Long>> actualClicksPerRegion =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
- outputTopic, expectedClicksPerRegion.size());
-
- assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return remainingExpectedResult.isEmpty();
+ }
+ }, "Never received expected result.");
+
+ final int expectedResultSize = expectedClicksPerRegion.size();
+ final int expectedNumberOfOutputs = (cacheSizeBytes == 0)
+ ? expectedResultSize
+ : Math.max(expectedResultSize, outputCounter.get());
+ final List<KeyValue<String, Long>> actualClicksPerRegion =
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ consumerConfig,
+ outputTopic,
+ expectedNumberOfOutputs);
+
+ assertThat(
+ actualClicksPerRegion.subList(expectedNumberOfOutputs -
expectedResultSize, expectedNumberOfOutputs),
+ equalTo(expectedClicksPerRegion));
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Flaky Unit test:
> KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache
> ------------------------------------------------------------------------------------------------
>
> Key: KAFKA-6256
> URL: https://issues.apache.org/jira/browse/KAFKA-6256
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 1.0.0
> Reporter: Matthias J. Sax
> Assignee: Matthias J. Sax
> Fix For: 1.1.0, 1.0.1
>
>
> {noformat}
> Expected: <[KeyValue(americas, 101), KeyValue(europe, 109), KeyValue(asia,
> 124)]>
> but: was <[KeyValue(europe, 13), KeyValue(asia, 25), KeyValue(americas,
> 23)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.junit.Assert.assertThat(Assert.java:956)
> at org.junit.Assert.assertThat(Assert.java:923)
> at
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.countClicksPerRegion(KStreamKTableJoinIntegrationTest.java:301)
> at
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegionWithNonZeroByteCache(KStreamKTableJoinIntegrationTest.java:144)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)