This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 1844948  KAFKA-10638: Fix QueryableStateIntegrationTest (#9521)
1844948 is described below

commit 18449480a4608b817be868106aa797bb55c711cb
Author: John Roesler <[email protected]>
AuthorDate: Thu Oct 29 11:57:31 2020 -0500

    KAFKA-10638: Fix QueryableStateIntegrationTest (#9521)
    
    This test has been observed to have flaky failures.
    Apparently, in the failed runs, Streams had entered a rebalance
    before some of the assertions were made. We recently made
    IQ a little stricter on whether it would return errors instead of
    null responses in such cases:
    KAFKA-10598: Improve IQ name and type checks (#9408)
    
    As a result, we have started seeing failures now instead of
    silently executing an invalid test (I.e., it was asserting the
    return to be null, but the result was null for the wrong
    reason).
    
    Now, if the test discovers that Streams is no longer running,
    it will repeat the verification until it actually gets a valid
    positive or negative result.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/streams/state/QueryableStoreType.java    |   2 +-
 .../integration/StoreQueryIntegrationTest.java     | 265 +++++++++++++--------
 2 files changed, 165 insertions(+), 102 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 79c335a..9771553 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -44,7 +44,7 @@ public interface QueryableStoreType<T> {
      * @param storeProvider     provides access to all the underlying 
StateStore instances
      * @param storeName         The name of the Store
      * @return a read-only interface over a {@code StateStore}
-     *        (cf. {@link 
org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
+     *        (cf. {@link QueryableStoreTypes.KeyValueStoreType})
      */
     T create(final StateStoreProvider storeProvider,
              final String storeName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 9cc5194..712ae91 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -28,15 +28,16 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -44,6 +45,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -55,19 +58,25 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 
 @Category({IntegrationTest.class})
 public class StoreQueryIntegrationTest {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreQueryIntegrationTest.class);
+
     private static final int NUM_BROKERS = 1;
     private static int port = 0;
     private static final String INPUT_TOPIC_NAME = "input-topic";
@@ -102,10 +111,10 @@ public class StoreQueryIntegrationTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
-                        Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
-                                .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
         final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
@@ -117,17 +126,34 @@ public class StoreQueryIntegrationTest {
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), 
is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {
+                assertThat(
+                    exception.getMessage(),
+                    containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+                );
+                LOG.info("Streams wasn't running. Will try again.");
+                return false;
+            }
+        });
     }
 
     @Test
@@ -138,10 +164,10 @@ public class StoreQueryIntegrationTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
-                        Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
-                                .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
         final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
@@ -153,51 +179,75 @@ public class StoreQueryIntegrationTest {
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-        //key belongs to this partition
-        final int keyPartition = keyQueryMetadata.partition();
-
-        //key doesn't belongs to this partition
-        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
storeQueryParam =
-            StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-                .withPartition(keyPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        if (kafkaStreams1IsActive) {
-            store1 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
-        } else {
-            store2 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
-        }
-
-        if (kafkaStreams1IsActive) {
-            assertThat(store1, is(notNullValue()));
-            assertThat(store2, is(nullValue()));
-        } else {
-            assertThat(store2, is(notNullValue()));
-            assertThat(store1, is(nullValue()));
-        }
+        until(() -> {
+            final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+            //key belongs to this partition
+            final int keyPartition = keyQueryMetadata.partition();
+
+            //key doesn't belongs to this partition
+            final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+            final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                    .withPartition(keyPartition);
+            ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+            ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+            if (kafkaStreams1IsActive) {
+                store1 = getStore(kafkaStreams1, storeQueryParam);
+            } else {
+                store2 = getStore(kafkaStreams2, storeQueryParam);
+            }
+
+            if (kafkaStreams1IsActive) {
+                assertThat(store1, is(notNullValue()));
+                assertThat(store2, is(nullValue()));
+            } else {
+                assertThat(store2, is(notNullValue()));
+                assertThat(store1, is(nullValue()));
+            }
+
+            // Assert that only active for a specific requested partition 
serves key if stale stores and not enabled
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                .withPartition(keyDontBelongPartition);
 
-        // Assert that only active for a specific requested partition serves 
key if stale stores and not enabled
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
 
-        storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-            .withPartition(keyDontBelongPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        if (!kafkaStreams1IsActive) {
-            store3 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
-        } else {
-            store4 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
-        }
 
-        // Assert that key is not served when wrong specific partition is 
requested
-        // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be 
active for keyDontBelongPartition
-        // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
-        assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), 
is(nullValue()));
+            try {
+                // Assert that key is not served when wrong specific partition 
is requested
+                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
+                if (kafkaStreams1IsActive) {
+                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")
+                    );
+                } else {
+                    assertThat(getStore(kafkaStreams1, 
storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams2, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")
+                    );
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {
+                assertThat(
+                    exception.getMessage(),
+                    containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+                );
+                LOG.info("Streams wasn't running. Will try again.");
+                return false;
+            }
+        });
     }
 
     @Test
@@ -208,10 +258,10 @@ public class StoreQueryIntegrationTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
-                        Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
-                                .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
         final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
@@ -224,17 +274,15 @@ public class StoreQueryIntegrationTest {
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
 
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
 
         // Assert that both active and standby are able to query for a key
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils
-                .getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
             return store1.get(key) != null;
         }, "store1 cannot find results for key");
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils
-                .getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
             return store2.get(key) != null;
         }, "store2 cannot find results for key");
     }
@@ -247,10 +295,10 @@ public class StoreQueryIntegrationTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
-                        Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
-                                .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
         final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
@@ -269,7 +317,7 @@ public class StoreQueryIntegrationTest {
 
         //key doesn't belongs to this partition
         final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
 
         // Assert that both active and standby are able to query for a key
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
param = StoreQueryParameters
@@ -277,11 +325,11 @@ public class StoreQueryIntegrationTest {
             .enableStaleStores()
             .withPartition(keyPartition);
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils.getStore(kafkaStreams1, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(kafkaStreams1, param);
             return store1.get(key) != null;
         }, "store1 cannot find results for key");
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils.getStore(kafkaStreams2, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(kafkaStreams2, param);
             return store2.get(key) != null;
         }, "store2 cannot find results for key");
 
@@ -289,8 +337,8 @@ public class StoreQueryIntegrationTest {
             .fromNameAndType(TABLE_NAME, queryableStoreType)
             .enableStaleStores()
             .withPartition(keyDontBelongPartition);
-        final ReadOnlyKeyValueStore<Integer, Integer> store3 = 
IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
-        final ReadOnlyKeyValueStore<Integer, Integer> store4 = 
IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
+        final ReadOnlyKeyValueStore<Integer, Integer> store3 = 
getStore(kafkaStreams1, otherParam);
+        final ReadOnlyKeyValueStore<Integer, Integer> store4 = 
getStore(kafkaStreams2, otherParam);
 
         // Assert that
         assertThat(store3.get(key), is(nullValue()));
@@ -306,10 +354,10 @@ public class StoreQueryIntegrationTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
-                Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
-                        .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());
 
         final Properties streamsConfiguration1 = streamsConfiguration();
         streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
@@ -337,34 +385,49 @@ public class StoreQueryIntegrationTest {
 
         //key doesn't belongs to this partition
         final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
 
         // Assert that both active and standby are able to query for a key
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
param = StoreQueryParameters
-                .fromNameAndType(TABLE_NAME, queryableStoreType)
-                .enableStaleStores()
-                .withPartition(keyPartition);
+            .fromNameAndType(TABLE_NAME, queryableStoreType)
+            .enableStaleStores()
+            .withPartition(keyPartition);
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils.getStore(kafkaStreams1, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(kafkaStreams1, param);
             return store1.get(key) != null;
         }, "store1 cannot find results for key");
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils.getStore(kafkaStreams2, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(kafkaStreams2, param);
             return store2.get(key) != null;
         }, "store2 cannot find results for key");
 
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
otherParam = StoreQueryParameters
-                .fromNameAndType(TABLE_NAME, queryableStoreType)
-                .enableStaleStores()
-                .withPartition(keyDontBelongPartition);
-        final ReadOnlyKeyValueStore<Integer, Integer> store3 = 
IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
-        final ReadOnlyKeyValueStore<Integer, Integer> store4 = 
IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
+            .fromNameAndType(TABLE_NAME, queryableStoreType)
+            .enableStaleStores()
+            .withPartition(keyDontBelongPartition);
+        final ReadOnlyKeyValueStore<Integer, Integer> store3 = 
getStore(kafkaStreams1, otherParam);
+        final ReadOnlyKeyValueStore<Integer, Integer> store4 = 
getStore(kafkaStreams2, otherParam);
 
         // Assert that
         assertThat(store3.get(key), is(nullValue()));
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    private static void until(final TestCondition condition) {
+        boolean success = false;
+        final long deadline = System.currentTimeMillis() + 
IntegrationTestUtils.DEFAULT_TIMEOUT;
+        while (!success && System.currentTimeMillis() < deadline) {
+            try {
+                success = condition.conditionMet();
+                Thread.sleep(500L);
+            } catch (final RuntimeException e) {
+                throw e;
+            } catch (final Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     private KafkaStreams createKafkaStreams(final StreamsBuilder builder, 
final Properties config) {
         final KafkaStreams streams = new KafkaStreams(builder.build(config), 
config);
         streamsToCleanup.add(streams);
@@ -378,12 +441,12 @@ public class StoreQueryIntegrationTest {
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                INPUT_TOPIC_NAME,
-                IntStream.range(start, endExclusive)
-                        .mapToObj(i -> KeyValue.pair(key, i))
-                        .collect(Collectors.toList()),
-                producerProps,
-                mockTime);
+            INPUT_TOPIC_NAME,
+            IntStream.range(start, endExclusive)
+                     .mapToObj(i -> KeyValue.pair(key, i))
+                     .collect(Collectors.toList()),
+            producerProps,
+            mockTime);
     }
 
     private Properties streamsConfiguration() {

Reply via email to