This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 1844948 KAFKA-10638: Fix QueryableStateIntegrationTest (#9521)
1844948 is described below
commit 18449480a4608b817be868106aa797bb55c711cb
Author: John Roesler <[email protected]>
AuthorDate: Thu Oct 29 11:57:31 2020 -0500
KAFKA-10638: Fix QueryableStateIntegrationTest (#9521)
This test has been observed to have flaky failures.
Apparently, in the failed runs, Streams had entered a rebalance
before some of the assertions were made. We recently made
IQ a little stricter on whether it would return errors instead of
null responses in such cases:
KAFKA-10598: Improve IQ name and type checks (#9408)
As a result, we have started seeing failures now instead of
silently executing an invalid test (I.e., it was asserting the
return to be null, but the result was null for the wrong
reason).
Now, if the test discovers that Streams is no longer running,
it will repeat the verification until it actually gets a valid
positive or negative result.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/streams/state/QueryableStoreType.java | 2 +-
.../integration/StoreQueryIntegrationTest.java | 265 +++++++++++++--------
2 files changed, 165 insertions(+), 102 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 79c335a..9771553 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -44,7 +44,7 @@ public interface QueryableStoreType<T> {
* @param storeProvider provides access to all the underlying
StateStore instances
* @param storeName The name of the Store
* @return a read-only interface over a {@code StateStore}
- * (cf. {@link
org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
+ * (cf. {@link QueryableStoreTypes.KeyValueStoreType})
*/
T create(final StateStoreProvider storeProvider,
final String storeName);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 9cc5194..712ae91 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -28,15 +28,16 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -44,6 +45,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
@@ -55,19 +58,25 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class StoreQueryIntegrationTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(StoreQueryIntegrationTest.class);
+
private static final int NUM_BROKERS = 1;
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
@@ -102,10 +111,10 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(),
Serdes.Integer()),
- Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
- .withCachingDisabled())
- .toStream()
- .peek((k, v) -> semaphore.release());
+ Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
+ .withCachingDisabled())
+ .toStream()
+ .peek((k, v) -> semaphore.release());
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
@@ -117,17 +126,34 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
-
- final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = QueryableStoreTypes.keyValueStore();
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
- final boolean kafkaStreams1IsActive =
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
- // Assert that only active is able to query for a key by default
- assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key),
is(notNullValue()));
- assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key),
is(nullValue()));
+ until(() -> {
+
+ final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
+
+ final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = keyValueStore();
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+ final boolean kafkaStreams1IsActive =
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+ // Assert that only active is able to query for a key by default
+ assertThat(kafkaStreams1IsActive ? store1.get(key) :
store2.get(key), is(notNullValue()));
+ try {
+ if (kafkaStreams1IsActive) {
+ assertThat(store2.get(key), is(nullValue()));
+ } else {
+ assertThat(store1.get(key), is(nullValue()));
+ }
+ return true;
+ } catch (final InvalidStateStoreException exception) {
+ assertThat(
+ exception.getMessage(),
+ containsString("Cannot get state store source-table
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+ );
+ LOG.info("Streams wasn't running. Will try again.");
+ return false;
+ }
+ });
}
@Test
@@ -138,10 +164,10 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(),
Serdes.Integer()),
- Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
- .withCachingDisabled())
- .toStream()
- .peek((k, v) -> semaphore.release());
+ Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
+ .withCachingDisabled())
+ .toStream()
+ .peek((k, v) -> semaphore.release());
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
@@ -153,51 +179,75 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
-
- //key belongs to this partition
- final int keyPartition = keyQueryMetadata.partition();
-
- //key doesn't belongs to this partition
- final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
- final boolean kafkaStreams1IsActive =
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
- StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>>
storeQueryParam =
- StoreQueryParameters.<ReadOnlyKeyValueStore<Integer,
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
- .withPartition(keyPartition);
- ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
- ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
- if (kafkaStreams1IsActive) {
- store1 = IntegrationTestUtils.getStore(kafkaStreams1,
storeQueryParam);
- } else {
- store2 = IntegrationTestUtils.getStore(kafkaStreams2,
storeQueryParam);
- }
-
- if (kafkaStreams1IsActive) {
- assertThat(store1, is(notNullValue()));
- assertThat(store2, is(nullValue()));
- } else {
- assertThat(store2, is(notNullValue()));
- assertThat(store1, is(nullValue()));
- }
+ until(() -> {
+ final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
+
+ //key belongs to this partition
+ final int keyPartition = keyQueryMetadata.partition();
+
+ //key doesn't belongs to this partition
+ final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+ final boolean kafkaStreams1IsActive =
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+ final StoreQueryParameters<ReadOnlyKeyValueStore<Integer,
Integer>> storeQueryParam =
+ StoreQueryParameters.<ReadOnlyKeyValueStore<Integer,
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+ .withPartition(keyPartition);
+ ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+ ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+ if (kafkaStreams1IsActive) {
+ store1 = getStore(kafkaStreams1, storeQueryParam);
+ } else {
+ store2 = getStore(kafkaStreams2, storeQueryParam);
+ }
+
+ if (kafkaStreams1IsActive) {
+ assertThat(store1, is(notNullValue()));
+ assertThat(store2, is(nullValue()));
+ } else {
+ assertThat(store2, is(notNullValue()));
+ assertThat(store1, is(nullValue()));
+ }
+
+ // Assert that only active for a specific requested partition
serves key if stale stores and not enabled
+ assertThat(kafkaStreams1IsActive ? store1.get(key) :
store2.get(key), is(notNullValue()));
+
+ final StoreQueryParameters<ReadOnlyKeyValueStore<Integer,
Integer>> storeQueryParam2 =
+ StoreQueryParameters.<ReadOnlyKeyValueStore<Integer,
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+ .withPartition(keyDontBelongPartition);
- // Assert that only active for a specific requested partition serves
key if stale stores and not enabled
- assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key),
is(notNullValue()));
- storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer,
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
- .withPartition(keyDontBelongPartition);
- ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
- ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
- if (!kafkaStreams1IsActive) {
- store3 = IntegrationTestUtils.getStore(kafkaStreams1,
storeQueryParam);
- } else {
- store4 = IntegrationTestUtils.getStore(kafkaStreams2,
storeQueryParam);
- }
- // Assert that key is not served when wrong specific partition is
requested
- // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be
active for keyDontBelongPartition
- // So, in that case, store3 would be null and the store4 would not
return the value for key as wrong partition was requested
- assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key),
is(nullValue()));
+ try {
+ // Assert that key is not served when wrong specific partition
is requested
+ // If kafkaStreams1 is active for keyPartition, kafkaStreams2
would be active for keyDontBelongPartition
+ // So, in that case, store3 would be null and the store4 would
not return the value for key as wrong partition was requested
+ if (kafkaStreams1IsActive) {
+ assertThat(getStore(kafkaStreams2,
storeQueryParam2).get(key), is(nullValue()));
+ final InvalidStateStoreException exception =
+ assertThrows(InvalidStateStoreException.class, () ->
getStore(kafkaStreams1, storeQueryParam2).get(key));
+ assertThat(
+ exception.getMessage(),
+ containsString("The specified partition 1 for store
source-table does not exist.")
+ );
+ } else {
+ assertThat(getStore(kafkaStreams1,
storeQueryParam2).get(key), is(nullValue()));
+ final InvalidStateStoreException exception =
+ assertThrows(InvalidStateStoreException.class, () ->
getStore(kafkaStreams2, storeQueryParam2).get(key));
+ assertThat(
+ exception.getMessage(),
+ containsString("The specified partition 1 for store
source-table does not exist.")
+ );
+ }
+ return true;
+ } catch (final InvalidStateStoreException exception) {
+ assertThat(
+ exception.getMessage(),
+ containsString("Cannot get state store source-table
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+ );
+ LOG.info("Streams wasn't running. Will try again.");
+ return false;
+ }
+ });
}
@Test
@@ -208,10 +258,10 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(),
Serdes.Integer()),
- Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
- .withCachingDisabled())
- .toStream()
- .peek((k, v) -> semaphore.release());
+ Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
+ .withCachingDisabled())
+ .toStream()
+ .peek((k, v) -> semaphore.release());
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
@@ -224,17 +274,15 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = QueryableStoreTypes.keyValueStore();
+ final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = keyValueStore();
// Assert that both active and standby are able to query for a key
TestUtils.waitForCondition(() -> {
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils
- .getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
return store1.get(key) != null;
}, "store1 cannot find results for key");
TestUtils.waitForCondition(() -> {
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils
- .getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
return store2.get(key) != null;
}, "store2 cannot find results for key");
}
@@ -247,10 +295,10 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(),
Serdes.Integer()),
- Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
- .withCachingDisabled())
- .toStream()
- .peek((k, v) -> semaphore.release());
+ Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
+ .withCachingDisabled())
+ .toStream()
+ .peek((k, v) -> semaphore.release());
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
@@ -269,7 +317,7 @@ public class StoreQueryIntegrationTest {
//key doesn't belongs to this partition
final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
- final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = QueryableStoreTypes.keyValueStore();
+ final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = keyValueStore();
// Assert that both active and standby are able to query for a key
final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>>
param = StoreQueryParameters
@@ -277,11 +325,11 @@ public class StoreQueryIntegrationTest {
.enableStaleStores()
.withPartition(keyPartition);
TestUtils.waitForCondition(() -> {
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(kafkaStreams1, param);
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
getStore(kafkaStreams1, param);
return store1.get(key) != null;
}, "store1 cannot find results for key");
TestUtils.waitForCondition(() -> {
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(kafkaStreams2, param);
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
getStore(kafkaStreams2, param);
return store2.get(key) != null;
}, "store2 cannot find results for key");
@@ -289,8 +337,8 @@ public class StoreQueryIntegrationTest {
.fromNameAndType(TABLE_NAME, queryableStoreType)
.enableStaleStores()
.withPartition(keyDontBelongPartition);
- final ReadOnlyKeyValueStore<Integer, Integer> store3 =
IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
- final ReadOnlyKeyValueStore<Integer, Integer> store4 =
IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
+ final ReadOnlyKeyValueStore<Integer, Integer> store3 =
getStore(kafkaStreams1, otherParam);
+ final ReadOnlyKeyValueStore<Integer, Integer> store4 =
getStore(kafkaStreams2, otherParam);
// Assert that
assertThat(store3.get(key), is(nullValue()));
@@ -306,10 +354,10 @@ public class StoreQueryIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(),
Serdes.Integer()),
- Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
- .withCachingDisabled())
- .toStream()
- .peek((k, v) -> semaphore.release());
+ Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
+ .withCachingDisabled())
+ .toStream()
+ .peek((k, v) -> semaphore.release());
final Properties streamsConfiguration1 = streamsConfiguration();
streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numStreamThreads);
@@ -337,34 +385,49 @@ public class StoreQueryIntegrationTest {
//key doesn't belongs to this partition
final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
- final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = QueryableStoreTypes.keyValueStore();
+ final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = keyValueStore();
// Assert that both active and standby are able to query for a key
final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>>
param = StoreQueryParameters
- .fromNameAndType(TABLE_NAME, queryableStoreType)
- .enableStaleStores()
- .withPartition(keyPartition);
+ .fromNameAndType(TABLE_NAME, queryableStoreType)
+ .enableStaleStores()
+ .withPartition(keyPartition);
TestUtils.waitForCondition(() -> {
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(kafkaStreams1, param);
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
getStore(kafkaStreams1, param);
return store1.get(key) != null;
}, "store1 cannot find results for key");
TestUtils.waitForCondition(() -> {
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(kafkaStreams2, param);
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
getStore(kafkaStreams2, param);
return store2.get(key) != null;
}, "store2 cannot find results for key");
final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>>
otherParam = StoreQueryParameters
- .fromNameAndType(TABLE_NAME, queryableStoreType)
- .enableStaleStores()
- .withPartition(keyDontBelongPartition);
- final ReadOnlyKeyValueStore<Integer, Integer> store3 =
IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
- final ReadOnlyKeyValueStore<Integer, Integer> store4 =
IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
+ .fromNameAndType(TABLE_NAME, queryableStoreType)
+ .enableStaleStores()
+ .withPartition(keyDontBelongPartition);
+ final ReadOnlyKeyValueStore<Integer, Integer> store3 =
getStore(kafkaStreams1, otherParam);
+ final ReadOnlyKeyValueStore<Integer, Integer> store4 =
getStore(kafkaStreams2, otherParam);
// Assert that
assertThat(store3.get(key), is(nullValue()));
assertThat(store4.get(key), is(nullValue()));
}
+ private static void until(final TestCondition condition) {
+ boolean success = false;
+ final long deadline = System.currentTimeMillis() +
IntegrationTestUtils.DEFAULT_TIMEOUT;
+ while (!success && System.currentTimeMillis() < deadline) {
+ try {
+ success = condition.conditionMet();
+ Thread.sleep(500L);
+ } catch (final RuntimeException e) {
+ throw e;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private KafkaStreams createKafkaStreams(final StreamsBuilder builder,
final Properties config) {
final KafkaStreams streams = new KafkaStreams(builder.build(config),
config);
streamsToCleanup.add(streams);
@@ -378,12 +441,12 @@ public class StoreQueryIntegrationTest {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
IntegerSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(
- INPUT_TOPIC_NAME,
- IntStream.range(start, endExclusive)
- .mapToObj(i -> KeyValue.pair(key, i))
- .collect(Collectors.toList()),
- producerProps,
- mockTime);
+ INPUT_TOPIC_NAME,
+ IntStream.range(start, endExclusive)
+ .mapToObj(i -> KeyValue.pair(key, i))
+ .collect(Collectors.toList()),
+ producerProps,
+ mockTime);
}
private Properties streamsConfiguration() {