mjsax commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r2026210742
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -644,16 +645,16 @@ public void
shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 2);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+ final List<KeyValue<String, String>> expected =
Collections.singletonList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
);
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(expected)
);
if (materialized) {
assertThat(
- asMap(store),
+ asList(store),
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -712,12 +713,12 @@ public void
shouldEmitRecordOnNullForeignKeyForLeftJoins(final String optimizati
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+ final List<KeyValue<String, String>> expected =
Collections.singletonList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
);
- assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ assertThat(outputTopic.readKeyValuesToList(), is(expected));
if (materialized) {
- assertThat(asMap(store), is(expected));
+ assertThat(asList(store), is(expected));
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -423,15 +428,15 @@ public void
shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
- final Map<String, String> expected =
- leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"))
: emptyMap();
+ final List<KeyValue<String, String>> expected =
+ leftJoin ? Collections.singletonList(KeyValue.pair("lhs1",
"(lhsValue1|rhs1,null)")) : emptyList();
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(expected)
);
if (materialized) {
assertThat(
- asMap(store),
+ asList(store),
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -753,23 +754,23 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final
String optimization,
final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+ final List<KeyValue<String, String>> expected =
Collections.singletonList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
);
- assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ assertThat(outputTopic.readKeyValuesToList(), is(expected));
if (materialized) {
- assertThat(asMap(store), is(expected));
+ assertThat(asList(store), is(expected));
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -307,12 +310,13 @@ public void doJoinFromRightThenDeleteRightEntity(final
boolean leftJoin,
left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(leftJoin
- ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
- mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
- : emptyMap()
+ ? Arrays.asList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)"),
Review Comment:
nit: indention
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -795,6 +796,12 @@ protected static Map<String, String> asMap(final
KeyValueStore<String, ValueAndT
return result;
}
+ protected static List<KeyValue<String, String>> asList(final
KeyValueStore<String, ValueAndTimestamp<String>> store) {
+ final List<KeyValue<String, String>> result = new LinkedList<>();
+ store.all().forEachRemaining(kv -> result.add(new KeyValue<>(kv.key,
kv.value.value()))); // TODO: find a better solution
Review Comment:
This look ok to me?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -753,23 +754,23 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final
String optimization,
final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+ final List<KeyValue<String, String>> expected =
Collections.singletonList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
);
- assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ assertThat(outputTopic.readKeyValuesToList(), is(expected));
if (materialized) {
- assertThat(asMap(store), is(expected));
+ assertThat(asList(store), is(expected));
}
Assertions.assertNotNull(subscriptionStore.get(key));
}
left.pipeInput("lhs1", "lhsValue1|returnNull", baseTimestamp);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|returnNull,null)")
+ final List<KeyValue<String, String>> expected =
Collections.singletonList(
+ KeyValue.pair("lhs1", "(lhsValue1|returnNull,null)")
);
- assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ assertThat(outputTopic.readKeyValuesToList(), is(expected));
if (materialized) {
- assertThat(asMap(store), is(expected));
+ assertThat(asList(store), is(expected));
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -662,16 +663,16 @@ public void
shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
// Change LHS foreign key reference
left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 3);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")
+ final List<KeyValue<String, String>> expected =
Collections.singletonList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs2,rhsValue2)")
);
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(expected)
);
if (materialized) {
assertThat(
- asMap(store),
+ asList(store),
Review Comment:
as above
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##########
@@ -203,26 +206,26 @@ public void doJoinFromLeftThenDeleteLeftEntity(final
boolean leftJoin,
left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
{
- final Map<String, String> expected = mkMap(
- mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
- mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
- );
+ final List<KeyValue<String, String>> expected = Arrays.asList(
+ KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+ KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)"));
+
assertThat(
- outputTopic.readKeyValuesToMap(),
+ outputTopic.readKeyValuesToList(),
is(expected)
);
if (rejoin) {
assertThat(
- rejoinOutputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1",
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
- mkEntry("lhs2",
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
- ))
+ rejoinOutputTopic.readKeyValuesToList(),
+ is(Arrays.asList(
+ KeyValue.pair("lhs1",
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+ KeyValue.pair("lhs2",
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)"))
+ )
);
}
if (materialized) {
assertThat(
- asMap(store),
+ asList(store),
Review Comment:
This seems to be potentially brittle? The problem is, we don't have any
guarantee in what order the store would add the record to the list, right? And
it could change. So for checking the store content it seems we should keep
using Map, and only switch to List when verifying the result topic.
Cf
https://github.com/apache/kafka/pull/19005/files#diff-ec133e43438f0cc66fc57a58758594758ec81cdb8e1039bcc949e613aa4d7b18R230
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]