This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49aed78  KAFKA-13128: extract retry checker and update with retriable 
exception causing flaky StoreQueryIntegrationTest (#11275)
49aed78 is described below

commit 49aed781d8b398975e8952ef21578c05114064ed
Author: Walker Carlson <[email protected]>
AuthorDate: Fri Aug 27 22:12:12 2021 -0500

    KAFKA-13128: extract retry checker and update with retriable exception 
causing flaky StoreQueryIntegrationTest (#11275)
    
    Add a new case to the list of possible retriable exceptions for the flaky 
tests to take care of threads starting up
    
    Reviewers: Leah Thomas <[email protected]>, Anna Sophie Blee-Goldman
---
 .../integration/StoreQueryIntegrationTest.java     | 58 +++++++---------------
 1 file changed, 18 insertions(+), 40 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index c4a236b..76916c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -148,16 +148,7 @@ public class StoreQueryIntegrationTest {
                 }
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                        "Unexpected exception thrown while getting the value 
from store.",
-                        exception.getMessage(),
-                        is(
-                                anyOf(
-                                        containsString("Cannot get state store 
source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                                        containsString("The state store, 
source-table, may have migrated to another instance")
-                                )
-                        )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
                 return false;
             }
@@ -241,16 +232,7 @@ public class StoreQueryIntegrationTest {
                 }
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                    "Unexpected exception thrown while getting the value from 
store.",
-                    exception.getMessage(),
-                    is(
-                        anyOf(
-                            containsString("Cannot get state store 
source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                            containsString("The state store, source-table, may 
have migrated to another instance")
-                        )
-                    )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
                 return false;
             }
@@ -513,16 +495,7 @@ public class StoreQueryIntegrationTest {
                 assertThat(store1.get(key3), is(notNullValue()));
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                    "Unexpected exception thrown while getting the value from 
store.",
-                    exception.getMessage(),
-                    is(
-                        anyOf(
-                            containsString("Cannot get state store 
source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                            containsString("The state store, source-table, may 
have migrated to another instance")
-                        )
-                    )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
                 return false;
             }
@@ -543,22 +516,27 @@ public class StoreQueryIntegrationTest {
                 assertThat(store1.get(key3), is(notNullValue()));
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                    "Unexpected exception thrown while getting the value from 
store.",
-                    exception.getMessage(),
-                    is(
-                        anyOf(
-                            containsString("Cannot get state store 
source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                            containsString("The state store, source-table, may 
have migrated to another instance")
-                        )
-                    )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took 
place. Will try again.");
                 return false;
             }
         });
     }
 
+    private void verifyRetrievableException(final Exception exception) {
+        assertThat(
+            "Unexpected exception thrown while getting the value from store.",
+            exception.getMessage(),
+            is(
+                anyOf(
+                    containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                    containsString("The state store, source-table, may have 
migrated to another instance"),
+                    containsString("Cannot get state store source-table 
because the stream thread is STARTING, not RUNNING")
+                )
+            )
+        );
+    }
+
     private static void until(final TestCondition condition) {
         boolean success = false;
         final long deadline = System.currentTimeMillis() + 
IntegrationTestUtils.DEFAULT_TIMEOUT;

Reply via email to