dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r429567784



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -296,6 +297,87 @@ public void shouldQuerySpecificStalePartitionStores() 
throws Exception {
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    @Test
+    public void shouldQuerySpecificActivePartitionStoresMultiStreamThreads() 
throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+        final int numStreamThreads = 2;
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                        .withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
+
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration1);
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration2);
+        final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
+
+        assertTrue(numStreamThreads > 1);
+        assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+        assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a 
timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
+        final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+        //key belongs to this partition
+        final int keyPartition = keyQueryMetadata.getPartition();
+
+        //key doesn't belongs to this partition
+        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+        final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.getActiveHost().port() % 2) == 1;
+
+        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
+                        .withPartition(keyPartition);
+        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+        if (kafkaStreams1IsActive) {
+            store1 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
+        } else {
+            store2 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
+        }
+
+        if (kafkaStreams1IsActive) {
+            assertThat(store1, is(notNullValue()));
+            assertThat(store2, is(nullValue()));
+        } else {
+            assertThat(store2, is(notNullValue()));
+            assertThat(store1, is(nullValue()));
+        }
+
+        // Assert that only active for a specific requested partition serves 
key if stale stores and not enabled

Review comment:
       correct, changed test to check both store types




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to