mjsax commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r2026210742


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -644,16 +645,16 @@ public void 
shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
 
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 2);
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+                final List<KeyValue<String, String>> expected = 
Collections.singletonList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
                 );
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (materialized) {
                     assertThat(
-                        asMap(store),
+                        asList(store),

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -712,12 +713,12 @@ public void 
shouldEmitRecordOnNullForeignKeyForLeftJoins(final String optimizati
 
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+                final List<KeyValue<String, String>> expected = 
Collections.singletonList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
                 );
-                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                assertThat(outputTopic.readKeyValuesToList(), is(expected));
                 if (materialized) {
-                    assertThat(asMap(store), is(expected));
+                    assertThat(asList(store), is(expected));

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -423,15 +428,15 @@ public void 
shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
 
             {
-                final Map<String, String> expected =
-                    leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) 
: emptyMap();
+                final List<KeyValue<String, String>> expected =
+                    leftJoin ? Collections.singletonList(KeyValue.pair("lhs1", 
"(lhsValue1|rhs1,null)")) : emptyList();
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (materialized) {
                     assertThat(
-                        asMap(store),
+                        asList(store),

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -753,23 +754,23 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final 
String optimization,
             final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+                final List<KeyValue<String, String>> expected = 
Collections.singletonList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
                 );
-                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                assertThat(outputTopic.readKeyValuesToList(), is(expected));
                 if (materialized) {
-                    assertThat(asMap(store), is(expected));
+                    assertThat(asList(store), is(expected));

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -307,12 +310,13 @@ public void doJoinFromRightThenDeleteRightEntity(final 
boolean leftJoin,
             left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
 
             assertThat(
-                outputTopic.readKeyValuesToMap(),
+                outputTopic.readKeyValuesToList(),
                 is(leftJoin
-                    ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-                    mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-                    mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-                    : emptyMap()
+                    ? Arrays.asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)"),

Review Comment:
   nit: indention



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -795,6 +796,12 @@ protected static Map<String, String> asMap(final 
KeyValueStore<String, ValueAndT
         return result;
     }
 
+    protected static List<KeyValue<String, String>> asList(final 
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+        final List<KeyValue<String, String>> result = new LinkedList<>();
+        store.all().forEachRemaining(kv -> result.add(new KeyValue<>(kv.key, 
kv.value.value()))); // TODO: find a better solution

Review Comment:
   This look ok to me?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -753,23 +754,23 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final 
String optimization,
             final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+                final List<KeyValue<String, String>> expected = 
Collections.singletonList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
                 );
-                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                assertThat(outputTopic.readKeyValuesToList(), is(expected));
                 if (materialized) {
-                    assertThat(asMap(store), is(expected));
+                    assertThat(asList(store), is(expected));
                 }
                 Assertions.assertNotNull(subscriptionStore.get(key));
             }
             left.pipeInput("lhs1", "lhsValue1|returnNull", baseTimestamp);
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|returnNull,null)")
+                final List<KeyValue<String, String>> expected = 
Collections.singletonList(
+                    KeyValue.pair("lhs1", "(lhsValue1|returnNull,null)")
                 );
-                assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+                assertThat(outputTopic.readKeyValuesToList(), is(expected));
                 if (materialized) {
-                    assertThat(asMap(store), is(expected));
+                    assertThat(asList(store), is(expected));

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -662,16 +663,16 @@ public void 
shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
             // Change LHS foreign key reference
             left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 3);
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+                final List<KeyValue<String, String>> expected = 
Collections.singletonList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs2,rhsValue2)")
                 );
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (materialized) {
                     assertThat(
-                        asMap(store),
+                        asList(store),

Review Comment:
   as above



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -203,26 +206,26 @@ public void doJoinFromLeftThenDeleteLeftEntity(final 
boolean leftJoin,
             left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
             {
-                final Map<String, String> expected = mkMap(
-                    mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-                    mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
-                );
+                final List<KeyValue<String, String>> expected = Arrays.asList(
+                    KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+                    KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)"));
+
                 assertThat(
-                    outputTopic.readKeyValuesToMap(),
+                    outputTopic.readKeyValuesToList(),
                     is(expected)
                 );
                 if (rejoin) {
                     assertThat(
-                        rejoinOutputTopic.readKeyValuesToMap(),
-                        is(mkMap(
-                            mkEntry("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
-                            mkEntry("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
-                        ))
+                        rejoinOutputTopic.readKeyValuesToList(),
+                        is(Arrays.asList(
+                            KeyValue.pair("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+                            KeyValue.pair("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)"))
+                        )
                     );
                 }
                 if (materialized) {
                     assertThat(
-                        asMap(store),
+                        asList(store),

Review Comment:
   This seems to be potentially brittle? The problem is, we don't have any 
guarantee in what order the store would add the record to the list, right? And 
it could change. So for checking the store content it seems we should keep 
using Map, and only switch to List when verifying the result topic.
   
   Cf 
https://github.com/apache/kafka/pull/19005/files#diff-ec133e43438f0cc66fc57a58758594758ec81cdb8e1039bcc949e613aa4d7b18R230



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to