gustavodemorais commented on code in PR #28329: URL: https://github.com/apache/flink/pull/28329#discussion_r3437217193
########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperatorTest.java: ########## @@ -0,0 +1,1666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.snapshot; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.Phase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.BUILD_CHANGE_BUFFER_STATE_NAME; +import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.BUILD_TABLE_STATE_NAME; +import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.PROBE_BUFFER_STATE_NAME; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.within; + +/** Harness tests for {@link LateralSnapshotJoinOperator}. */ +class LateralSnapshotJoinOperatorTest { + + // ----------------------------------------------------------------- Schema + + /** Probe row schema: (id BIGINT, key VARCHAR, val VARCHAR). */ + private static final InternalTypeInfo<RowData> PROBE_TYPE = + InternalTypeInfo.ofFields( + new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); + + /** Build row schema: (key VARCHAR, val VARCHAR, rt BIGINT). */ + private static final InternalTypeInfo<RowData> BUILD_TYPE = + InternalTypeInfo.ofFields( + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType()); + + /** Joined output schema: probe ++ build = (id, pKey, pVal, bKey, bVal, bRt). */ + private static final LogicalType[] OUTPUT_TYPES = { + new BigIntType(), + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + new BigIntType() + }; + + /** Probe key column index (key VARCHAR is at field 1). */ + private static final int PROBE_KEY_IDX = 1; + + /** Build key column index (key VARCHAR is at field 0). */ + private static final int BUILD_KEY_IDX = 0; + + /** Build row-time column index (rt BIGINT is at field 2). */ + private static final int BUILD_RT_IDX = 2; + + private static final InternalTypeInfo<RowData> KEY_TYPE = + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE); + + private static final KeySelector<RowData, RowData> PROBE_KEY_SELECTOR = + nullSafeStringKeySelector(PROBE_KEY_IDX); + private static final KeySelector<RowData, RowData> BUILD_KEY_SELECTOR = + nullSafeStringKeySelector(BUILD_KEY_IDX); + + private static final RowDataHarnessAssertor JOINED_ASSERTOR = + new RowDataHarnessAssertor(OUTPUT_TYPES); + + // ----------------------------------------------------------------- Join conditions + + /** Trivial join condition that always matches (equality is enforced by partitioning). */ + private static final String ALWAYS_TRUE_JOIN_FUNC_CODE = + "public class LateralSnapshotJoinConditionStub extends " + + "org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + " public LateralSnapshotJoinConditionStub(Object[] reference) {}\n" + + " @Override public boolean apply(" + + " org.apache.flink.table.data.RowData in1," + + " org.apache.flink.table.data.RowData in2) { return true; }\n" + + "}\n"; + + /** + * Join condition that only matches when the probe value (field 2) equals {@code "match"}. Used + * to verify that the codegen'd condition is actually invoked at join time. + */ + private static final String MATCH_VAL_JOIN_FUNC_CODE = + "public class LateralSnapshotJoinConditionMatchVal extends " + + "org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + " public LateralSnapshotJoinConditionMatchVal(Object[] reference) {}\n" + + " @Override public boolean apply(" + + " org.apache.flink.table.data.RowData in1," + + " org.apache.flink.table.data.RowData in2) {\n" + + " if (in1.isNullAt(2)) { return false; }\n" + + " return \"match\".equals(in1.getString(2).toString());\n" + + " }\n" + + "}\n"; + + private static GeneratedJoinCondition newTrueCondition() { + return new GeneratedJoinCondition( + "LateralSnapshotJoinConditionStub", ALWAYS_TRUE_JOIN_FUNC_CODE, new Object[0]); + } + + private static GeneratedJoinCondition newMatchValCondition() { + return new GeneratedJoinCondition( + "LateralSnapshotJoinConditionMatchVal", MATCH_VAL_JOIN_FUNC_CODE, new Object[0]); + } + + // ----------------------------------------------------------------- Operator / harness + // factories + + private static LateralSnapshotJoinOperator newOperator( + boolean isLeftOuterJoin, + GeneratedJoinCondition joinCondition, + boolean[] filterNullKeys, + Long loadCompletedTime, + Long loadCompletedIdleTimeoutMs, + Long stateTtlMs) { + + return new LateralSnapshotJoinOperator( + isLeftOuterJoin, + PROBE_TYPE, + BUILD_TYPE, + BUILD_RT_IDX, + joinCondition, + filterNullKeys, + loadCompletedTime, + loadCompletedIdleTimeoutMs, + stateTtlMs); + } + + private static LateralSnapshotJoinOperator newOperator( + boolean isLeftOuterJoin, + Long loadCompletedTime, + Long loadCompletedIdleTimeoutMs, + Long stateTtlMs) { + + return newOperator( + isLeftOuterJoin, + newTrueCondition(), + new boolean[] {true}, + loadCompletedTime, + loadCompletedIdleTimeoutMs, + stateTtlMs); + } + + private static KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> + newHarness(LateralSnapshotJoinOperator op) throws Exception { + return new KeyedTwoInputStreamOperatorTestHarness<>( + op, PROBE_KEY_SELECTOR, BUILD_KEY_SELECTOR, KEY_TYPE); + } + + private static KeySelector<RowData, RowData> nullSafeStringKeySelector(final int keyIdx) { + return value -> { + BinaryRowData ret = new BinaryRowData(1); + BinaryRowWriter writer = new BinaryRowWriter(ret); + if (value.isNullAt(keyIdx)) { + writer.setNullAt(0); + } else { + writer.writeString(0, value.getString(keyIdx)); + } + writer.complete(); + return ret; + }; + } + + // ----------------------------------------------------------------- LOAD phase + + @Test + void loadPhaseBuildSideChangeProcessing() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // During LOAD, build-side changes are buffered and later applied in event-time order. + // -D for a never-inserted (key, value) pair is defensively ignored. + addBuildChange(h, deleteRecord("k1", "ghost", 5L)); + // Two identical records (same key/val/row-time) → count(k1, v1, 20) = 2. + addBuildChange(h, insertRecord("k1", "v1", 20L)); + addBuildChange(h, insertRecord("k1", "v1", 20L)); + // Earlier row-time than v1, but arrives later. + addBuildChange(h, insertRecord("k1", "v2", 10L)); + + // Still LOAD: changes are buffered, nothing applied or emitted yet. + assertPhase(op, Phase.LOAD); + assertThat(h.getOutput()).isEmpty(); + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4); + assertThat(buildTableKeys(h)).isEmpty(); + + // Advance the build watermark (still below the flip point) and access k1 again. The + // access drains the buffered batch in event-time order before buffering the new change. + addBuildWm(h, 50L); + addBuildChange(h, insertRecord("k1", "v3", 30L)); + // TODO: also add updateBefore and updateAfter changes, to ensure that these are handled + // correctly + + assertPhase(op, Phase.LOAD); + // Applied in row-time order: -D(ghost)@5 (ignored), +I(v2)@10, +I(v1)@20 ×2. + assertThat(buildTableKeys(h)).containsExactly("k1"); + assertThat(buildTableForKey(h, op, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L)); + // The triggering v3 change is now buffered, awaiting the next drain. + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(1); + } + } + + @Test + void loadPhaseProbeSideInputProcessing() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + + addBuildChange(h, insertRecord("k1", "build1", 1L)); + addBuildWm(h, 20L); + addProbeRecord(h, 1L, "k1", "probe-load-1"); + addProbeRecord(h, 2L, "k1", "probe-load-2"); + addProbeWm(h, 50L); + + assertPhase(op, Phase.LOAD); + // No output (records buffered, watermarks held back). + assertThat(h.getOutput()).isEmpty(); + + assertThat(op.getCurrentProbeSideWm()).isEqualTo(50L); + assertThat(probeBufferKeys(h)).containsExactly("k1"); + assertThat(probeBufferForKey(h, op, "k1")).hasSize(2); + // Build-side change was applied to the build table. + assertThat(bufferedChangesForKey(h, op, "k1")).isEmpty(); + assertThat(buildTableForKey(h, op, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("build1", 1L)); + } + } + + // ----------------------------------------------------------------- Flip / transition + + @ParameterizedTest(name = "leftOuter={0}, wmFlip={1}") + @CsvSource({"true, true", "true, false", "false, true", "false, false"}) + void flipDrainsProbeBufferAndJoins(boolean leftOuter, boolean wmFlip) throws Exception { + LateralSnapshotJoinOperator op = newOperator(leftOuter, 100L, 200L, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.setProcessingTime(0); + h.open(); + // LOAD: build state and buffered probes (one without a matching build row). + // k1 is a multi-set: count(v1)=2, count(v2)=1; k2 is a single row. + addBuildChange(h, insertRecord("k1", "build-k1-v1", 1L)); + addBuildChange(h, insertRecord("k1", "build-k1-v1", 1L)); + addBuildChange(h, insertRecord("k1", "build-k1-v2", 1L)); + addBuildChange(h, insertRecord("k2", "build-k2", 1L)); + // LOAD: add probe-side records + addProbeRecord(h, 1L, "k1", "probe-1"); + addProbeRecord(h, 2L, "k2", "probe-2"); + addProbeRecord(h, 3L, "k2", "probe-3"); + addProbeRecord(h, 4L, "k3", "probe-no-match"); + addProbeWm(h, 80L); + assertPhase(op, Phase.LOAD); + + // assert that probe-side buffer is filled + assertThat(probeBufferForKey(h, op, "k1")).hasSize(1); + assertThat(probeBufferForKey(h, op, "k2")).hasSize(2); + assertThat(probeBufferForKey(h, op, "k3")).hasSize(1); + // idle-timeout timer is armed while in LOAD + assertThat(op.isIdleFlipTimerActive()).isTrue(); + + // trigger flip from LOAD to JOIN + if (wmFlip) { + // build WM crosses loadCompletedTime. + addBuildWm(h, 100L); + } else { + // proc-time exceeds idle timeout + h.setProcessingTime(200); + } + assertPhase(op, Phase.JOIN); + // idle-timeout timer is removed on flip (canceled by a WM flip, fired by an idle flip) + assertThat(op.isIdleFlipTimerActive()).isFalse(); + + // probe k1 joins k1's multi-set (count-respecting: 2x v1 + 1x v2 = three rows); + // probe k2 (2 rows) joins a single k2 build row; + // probe k3 doesn't have a matching build row. INNER: no output, LEFT OUTER: null-padded + assertWatermarkForwardedAfterRecords(h.getOutput(), 80L); + stripWatermarksAndStatusesFromOutput(h); + if (leftOuter) { + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L), + row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L), + row(1L, "k1", "probe-1", "k1", "build-k1-v2", 1L), + row(2L, "k2", "probe-2", "k2", "build-k2", 1L), + row(3L, "k2", "probe-3", "k2", "build-k2", 1L), + row(4L, "k3", "probe-no-match", null, null, null)); + } else { + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L), + row(1L, "k1", "probe-1", "k1", "build-k1-v1", 1L), + row(1L, "k1", "probe-1", "k1", "build-k1-v2", 1L), + row(2L, "k2", "probe-2", "k2", "build-k2", 1L), + row(3L, "k2", "probe-3", "k2", "build-k2", 1L)); + } + // Probe buffer drained on flip; build table preserved (k1 keeps multi-set counts). + assertThat(probeBufferKeys(h)).isEmpty(); + assertThat(buildTableForKey(h, op, "k1")) + .containsExactlyInAnyOrderEntriesOf( + Map.of("build-k1-v1", 2L, "build-k1-v2", 1L)); + assertThat(buildTableForKey(h, op, "k2")) + .containsExactlyInAnyOrderEntriesOf(Map.of("build-k2", 1L)); + } + } + + @Test + void idleTimerRearmsOnBuildWatermark() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 1000L, 100L, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.setProcessingTime(10); + h.open(); + h.setProcessingTime(60); + // Build WM advances → re-arm. + addBuildWm(h, 10L); + // Original idle deadline was 10+100=110. Re-armed to 60+100=160. + h.setProcessingTime(159); + assertPhase(op, Phase.LOAD); + h.setProcessingTime(160); + assertPhase(op, Phase.JOIN); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void flipJoiningInvokesCodeGeneratedJoinCondition(boolean leftOuter) throws Exception { + LateralSnapshotJoinOperator op = + newOperator( + leftOuter, newMatchValCondition(), new boolean[] {true}, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addProbeRecord(h, 1L, "k1", "match"); + addProbeRecord(h, 2L, "k1", "skip"); + addProbeWm(h, 120L); + addBuildWm(h, 100L); + + assertWatermarkForwardedAfterRecords(h.getOutput(), 120L); + stripWatermarksAndStatusesFromOutput(h); + if (leftOuter) { + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "match", "k1", "v1", 1L), + row(2L, "k1", "skip", null, null, null)); + } else { + JOINED_ASSERTOR.shouldEmitAll(h, row(1L, "k1", "match", "k1", "v1", 1L)); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void flipJoiningCompositeEquiKeys(boolean leftOuter) throws Exception { + // Probe schema (kA VARCHAR, kB VARCHAR, val VARCHAR); build schema additionally carries a + // row-time attribute (kA VARCHAR, kB VARCHAR, val VARCHAR, rt BIGINT) at index 3. + InternalTypeInfo<RowData> probeType = + InternalTypeInfo.ofFields( + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); + InternalTypeInfo<RowData> buildType = + InternalTypeInfo.ofFields( + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + new BigIntType()); + final int buildRowtimeIndex = 3; + // Compose the composite key as a BinaryRowData with both key fields. + KeySelector<RowData, RowData> selector = + value -> { + BinaryRowData ret = new BinaryRowData(2); + BinaryRowWriter writer = new BinaryRowWriter(ret); + if (value.isNullAt(0)) { + writer.setNullAt(0); + } else { + writer.writeString(0, value.getString(0)); + } + if (value.isNullAt(1)) { + writer.setNullAt(1); + } else { + writer.writeString(1, value.getString(1)); + } + writer.complete(); + return ret; + }; + InternalTypeInfo<RowData> compositeKeyType = + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); + + LateralSnapshotJoinOperator op = + new LateralSnapshotJoinOperator( + leftOuter, + probeType, + buildType, + buildRowtimeIndex, + newTrueCondition(), + new boolean[] {true, true}, + 100L, + null, + null); + + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + new KeyedTwoInputStreamOperatorTestHarness<>( + op, selector, selector, compositeKeyType)) { + h.open(); + // build rows: two identical rows for key a-1, a different row for key a-1, one row for + // key a-2. + addBuildChange(h, insertRecord("a", "1", "b-a-1-1", 1L)); + addBuildChange(h, insertRecord("a", "1", "b-a-1-1", 1L)); + addBuildChange(h, insertRecord("a", "1", "b-a-1-2", 1L)); + addBuildChange(h, insertRecord("a", "2", "b-a-2-1", 1L)); + // probes: matching composite, non-matching composite. + h.processElement1(insertRecord("a", "1", "p-a-1")); + h.processElement1(insertRecord("a", "9", "p-a-9")); + h.processElement1(insertRecord("b", "1", "p-b-1")); + h.processElement1(insertRecord("b", "9", "p-b-9")); + // flip to JOIN phase + addBuildWm(h, 100L); + + LogicalType[] outTypes = { + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + new BigIntType() + }; + RowDataHarnessAssertor compositeAssertor = new RowDataHarnessAssertor(outTypes); + + stripWatermarksAndStatusesFromOutput(h); + if (leftOuter) { + compositeAssertor.shouldEmitAll( + h, + compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L), + compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L), + compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-2", 1L), + compKeyRow("a", "9", "p-a-9", null, null, null, null), + compKeyRow("b", "1", "p-b-1", null, null, null, null), + compKeyRow("b", "9", "p-b-9", null, null, null, null)); + } else { + compositeAssertor.shouldEmitAll( + h, + compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L), + compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-1", 1L), + compKeyRow("a", "1", "p-a-1", "a", "1", "b-a-1-2", 1L)); + } + } + } + + // ----------------------------------------------------------------- JOIN phase + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void joinPhaseImmediateInnerJoin(boolean leftOuter) throws Exception { + LateralSnapshotJoinOperator op = newOperator(leftOuter, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + + // First probe WM + addProbeWm(h, 10L); + assertWatermarkForwardedAfterRecords(h.getOutput(), 10L); + + // First probe — joined immediately. + addProbeRecord(h, 1L, "k1", "probe-immediate-1"); + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll(h, row(1L, "k1", "probe-immediate-1", "k1", "v1", 1L)); + + // Another probe WM + addProbeWm(h, 20L); + assertWatermarkForwardedAfterRecords(h.getOutput(), 20L); + + // Second probe for same key — joined immediately. + addProbeRecord(h, 2L, "k1", "probe-immediate-2"); + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll(h, row(2L, "k1", "probe-immediate-2", "k1", "v1", 1L)); + + // Probe for non-existent key — no output (INNER). + addProbeRecord(h, 3L, "k2", "probe-no-match"); + stripWatermarksAndStatusesFromOutput(h); + if (leftOuter) { + JOINED_ASSERTOR.shouldEmitAll(h, row(3L, "k2", "probe-no-match", null, null, null)); + } else { + assertThat(h.extractOutputStreamRecords()).isEmpty(); + } + + // one more probe WM + addProbeWm(h, 30L); + assertWatermarkForwardedAfterRecords(h.getOutput(), 30L); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void joinPhaseBuildSideChangeApplication(boolean appliedByBuild) throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // Two identical records for k1 (count 2) at row-time 1, buffered during LOAD. + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + + // Buffer four changes for k1 + addBuildChange(h, deleteRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k1", "v2", 102L)); + addBuildChange(h, updateBeforeRecord("k1", "v1", 1L)); + addBuildChange(h, updateAfterRecord("k1", "v3", 103L)); + // Buffer one change for k2. + addBuildChange(h, insertRecord("k2", "v1", 101L)); + + // assert number of buffered changes + assertThat(op.getCurrentBuildSideWm()).isEqualTo(100L); + assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(100L); + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4); + assertThat(bufferedAtWmFor(h, op, "k2")).isEqualTo(100L); + assertThat(bufferedChangesForKey(h, op, "k2")).hasSize(1); + assertThat(buildTableForKey(h, op, "k1")).isEqualTo(Map.of("v1", 2L)); + + // Probe record with no build WM advance - changes are not applied yet + addProbeRecord(h, 1L, "k1", "p-1"); + JOINED_ASSERTOR.shouldEmitAll( + h, row(1L, "k1", "p-1", "k1", "v1", 1L), row(1L, "k1", "p-1", "k1", "v1", 1L)); + + // increment build-side WM + addBuildWm(h, 110L); + + // assert that all changes are still buffered + assertThat(op.getCurrentBuildSideWm()).isEqualTo(110L); + assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(100L); + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4); + assertThat(bufferedAtWmFor(h, op, "k2")).isEqualTo(100L); + assertThat(bufferedChangesForKey(h, op, "k2")).hasSize(1); + + // trigger application of k1 changes by build or probe-side input + if (!appliedByBuild) { + addBuildChange(h, insertRecord("k1", "v4", 111L)); + // assert that changes have been applied and removed from buffer + // the triggering change is appended to the buffer + assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(110L); + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(1); + assertThat(buildTableForKey(h, op, "k1")).isEqualTo(Map.of("v2", 1L, "v3", 1L)); + // assert empty output + assertThat(h.getOutput()).isEmpty(); + } else { + addProbeRecord(h, 2L, "k1", "p-2"); + // assert that changes have been applied and removed from buffer + assertThat(bufferedAtWmFor(h, op, "k1")).isNull(); + assertThat(bufferedChangesForKey(h, op, "k1")).isEmpty(); + assertThat(buildTableForKey(h, op, "k1")) + .isEqualTo(Map.of("v2", 1L, "v3", 1L)); // / + // assert join results (v2 carries row-time 2, v3 carries row-time 1) + JOINED_ASSERTOR.shouldEmitAll( + h, + row(2L, "k1", "p-2", "k1", "v2", 102L), + row(2L, "k1", "p-2", "k1", "v3", 103L)); + } + + // assert that k2 change is still buffered + assertThat(bufferedAtWmFor(h, op, "k2")).isEqualTo(100L); + assertThat(bufferedChangesForKey(h, op, "k2")).hasSize(1); + // apply k2 change and join + addProbeRecord(h, 3L, "k2", "p-3"); + assertThat(bufferedAtWmFor(h, op, "k2")).isNull(); + assertThat(bufferedChangesForKey(h, op, "k2")).isEmpty(); + assertThat(buildTableForKey(h, op, "k2")).isEqualTo(Map.of("v1", 1L)); + // assert join results + JOINED_ASSERTOR.shouldEmitAll(h, row(3L, "k2", "p-3", "k2", "v1", 101L)); + } + } + + @Test + void joinPhaseWmForwardingLogic() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 100L); // flip + assertPhase(op, Phase.JOIN); + + // Build-side WMs after flip are not forwarded. + addBuildWm(h, 200L); + assertThat(extractWatermarks(h.getOutput())).isEmpty(); + + // Probe-side WMs in JOIN are forwarded. + addProbeWm(h, 150L); + assertThat(extractWatermarks(h.getOutput())).containsExactly(new Watermark(150)); + h.getOutput().clear(); + + // another build-side WM + addBuildWm(h, 300L); + assertThat(extractWatermarks(h.getOutput())).isEmpty(); + + addProbeWm(h, 250L); + assertThat(extractWatermarks(h.getOutput())).containsExactly(new Watermark(250)); + } + } + + /** + * For buffered build-side changes sharing the same row-time, retractions are applied before + * accumulations. + */ + @Test + void joinPhaseAppliesRetractionsBeforeAccumulationsAtEqualRowTime() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // Drive into JOIN with empty build state for k1/k2. + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + + // Buffer accumulation-first at row-time 105 on absent rows: + // k1: +I then -D ; k2: +U then -U ; k3: +U then -U + addBuildChange(h, insertRecord("k1", "v1", 105L)); + addBuildChange(h, deleteRecord("k1", "v1", 105L)); + addBuildChange(h, updateAfterRecord("k2", "v1", 105L)); + addBuildChange(h, updateBeforeRecord("k2", "v1", 105L)); + + // Advance build WM, then access each key to drain its buffer in event-time order. + addBuildWm(h, 200L); + addProbeRecord(h, 1L, "k1", "p1"); + addProbeRecord(h, 2L, "k2", "p2"); + + assertThat(buildTableForKey(h, op, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L)); + assertThat(buildTableForKey(h, op, "k2")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L)); + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "p1", "k1", "v1", 105L), + row(2L, "k2", "p2", "k2", "v1", 105L)); + } + } + + // ----------------------------------------------------------------- NULL keys + + @ParameterizedTest + @CsvSource({"true, true", "true, false", "false, true", "false, false"}) + void joinRespectsNullKeysFilter(boolean leftOuter, boolean filterNullKey) throws Exception { + LateralSnapshotJoinOperator op = + newOperator( + leftOuter, + newTrueCondition(), + new boolean[] {filterNullKey}, + 100L, + null, + null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + addBuildChange(h, insertRecord(null, "v_null", 1L)); + addProbeRecord(h, 1L, null, "p_null"); + addBuildWm(h, 100L); + + // test joining during LOAD -> JOIN transition + if (filterNullKey) { + if (leftOuter) { + JOINED_ASSERTOR.shouldEmitAll(h, row(1L, null, "p_null", null, null, null)); + } else { + assertThat(h.getOutput()).isEmpty(); + } + } else { + JOINED_ASSERTOR.shouldEmitAll(h, row(1L, null, "p_null", null, "v_null", 1L)); + } + + // test joining in JOIN phase + addProbeRecord(h, 2L, null, "p_null"); + if (filterNullKey) { + if (leftOuter) { + JOINED_ASSERTOR.shouldEmitAll(h, row(2L, null, "p_null", null, null, null)); + } else { + assertThat(h.getOutput()).isEmpty(); + } + } else { + JOINED_ASSERTOR.shouldEmitAll(h, row(2L, null, "p_null", null, "v_null", 1L)); + } + } + } + + // ----------------------------------------------------------------- Watermark status + + @Test + void buildSideWmAndWmStatusForwarding() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // probe-side WM + addProbeWm(h, 70L); + + // LOAD phase: build-side becomes idle then active again. WM is advanced. + h.processWatermarkStatus2(WatermarkStatus.IDLE); + h.processWatermarkStatus2(WatermarkStatus.ACTIVE); + addBuildWm(h, 50L); + // assert that no WMs or WM statuses are emitted in LOAD. + assertThat(h.getOutput()).isEmpty(); + + // Drive into JOIN + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + + // assert that only the probe-side WM was forwarded after the transition + assertThat(h.getOutput()).containsExactly(new Watermark(70L)); + h.getOutput().clear(); + + // JOIN phase: build-side becomes idle then active again. WM is advanced + h.processWatermarkStatus2(WatermarkStatus.IDLE); + h.processWatermarkStatus2(WatermarkStatus.ACTIVE); + addBuildWm(h, 200L); + + // No records, WMs, or WM statuses emitted in JOIN. + assertThat(h.getOutput()).isEmpty(); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void probeSideWmAndWmStatusForwarding(boolean probeIdleDuringLoad) throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + addProbeWm(h, 25L); + addProbeWm(h, 50L); + // Probe-side WM statuses received during LOAD. + h.processWatermarkStatus1(WatermarkStatus.IDLE); + if (!probeIdleDuringLoad) { + h.processWatermarkStatus1(WatermarkStatus.ACTIVE); + } + // Absorbed during LOAD — nothing emitted. + assertThat(extractWatermarks(h.getOutput())).isEmpty(); + assertThat(extractWatermarkStatuses(h.getOutput())).isEmpty(); + + // Flip to JOIN phase — last probe WM emitted. + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + + if (probeIdleDuringLoad) { + assertThat(extractWatermarkStatuses(h.getOutput())) + .containsExactly(WatermarkStatus.IDLE); + } else { + assertThat(extractWatermarkStatuses(h.getOutput())).isEmpty(); + } + assertThat(extractWatermarks(h.getOutput())).containsExactly(new Watermark(50)); + h.getOutput().clear(); + + // WMs and WM status updates received during JOIN are forwarded. + addProbeWm(h, 150L); + assertThat(h.getOutput()).containsExactly(new Watermark(150)); + h.getOutput().clear(); + // set probe-side to idle + h.processWatermarkStatus1(WatermarkStatus.IDLE); + assertThat(h.getOutput()).containsExactly(WatermarkStatus.IDLE); + h.getOutput().clear(); + // set probe-side to active + h.processWatermarkStatus1(WatermarkStatus.ACTIVE); + assertThat(h.getOutput()).containsExactly(WatermarkStatus.ACTIVE); + h.getOutput().clear(); + // emit another watermark + addProbeWm(h, 200L); + assertThat(h.getOutput()).containsExactly(new Watermark(200)); + h.getOutput().clear(); + } + } + + // ----------------------------------------------------------------- State TTL + + @Test + void stateTtlRefreshesOnAccessAndEvictsInactiveKeys() throws Exception { + // stateTtlMs = 100. Timers are registered at 1.5 × stateTtlMs, so the deadline for access + // at t=0 is 150. + LateralSnapshotJoinOperator op = newOperator(false, 50L, null, 100L); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.setProcessingTime(0); + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k2", "v1", 1L)); + addBuildChange(h, insertRecord("k3", "v1", 1L)); + addBuildChange(h, insertRecord("k4", "v1", 1L)); + + // State is NOT evicted during LOAD even after the deadline passes (TTL fires are + // rescheduled past the LOAD phase). + h.setProcessingTime(200); + assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3", "k4"); + + // flip to JOIN + addBuildWm(h, 50L); + assertPhase(op, Phase.JOIN); + + // Touch k1, k2, k3 to reset their TTL in JOIN; leave k4 alone. + h.setProcessingTime(275); + assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3", "k4"); + addBuildChange(h, insertRecord("k1", "v2", 2L)); + addBuildChange(h, insertRecord("k2", "v2", 2L)); + addBuildChange(h, insertRecord("k3", "v2", 2L)); + + // k4 evicted: it wasn't accessed since proc-time (0) and we flipped to JOIN at (200) + h.setProcessingTime(350); + assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3"); + + // Update build state for k1 and k2; leave k3 alone + addBuildWm(h, 60L); + h.setProcessingTime(400); + assertThat(buildStateKeys(h)).containsExactly("k1", "k2", "k3"); + addBuildChange(h, insertRecord("k1", "v3", 3L)); + addBuildChange(h, insertRecord("k2", "v3", 3L)); + + // k3 evicted, not accessed since (275) + addBuildWm(h, 70L); + h.setProcessingTime(475); + assertThat(buildStateKeys(h)).containsExactly("k1", "k2"); + // Access k1 from probe side to reset its TTL again; leave k2 alone + addProbeRecord(h, 1L, "k1", "p1"); + + // k2 evicted, not accessed since (400) + h.setProcessingTime(550); + assertThat(buildStateKeys(h)).containsExactly("k1"); + + // k1 finally evicted. + h.setProcessingTime(700); + assertThat(buildStateKeys(h)).isEmpty(); + + // The probe joined against the (k1) multi-set state at proc-time 475 — entries v1, + // v2, v3. + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "p1", "k1", "v1", 1L), + row(1L, "k1", "p1", "k1", "v2", 2L), + row(1L, "k1", "p1", "k1", "v3", 3L)); + } + } + + @Test + void stateTtlClearsAllPerKeyState() throws Exception { + // stateTtlMs = 100. Timers are registered at 1.5 × stateTtlMs. + LateralSnapshotJoinOperator op = newOperator(false, 50L, null, 100L); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.setProcessingTime(0); + h.open(); + // Buffered during LOAD + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 50L); // flip to JOIN + assertPhase(op, Phase.JOIN); + + // Buffer a change in JOIN that does NOT drain (no further WM advance / access). This + // populates the change buffer and the buffered-at tag while the build table keeps {v1}. + addBuildChange(h, insertRecord("k1", "v2")); + assertThat(buildTableForKey(h, op, "k1")).isEqualTo(Map.of("v1", 1L)); + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(1); + assertThat(bufferedAtWmFor(h, op, "k1")).isEqualTo(50L); + assertThat(ttlExpiryFor(h, op, "k1")).isEqualTo(150L); + + // Fire the TTL timer (well past the eviction deadline). + h.setProcessingTime(1000); + + // Every per-key state object is cleared, not just the build table. + assertThat(buildTableForKey(h, op, "k1")).isEmpty(); + assertThat(bufferedChangesForKey(h, op, "k1")).isEmpty(); + assertThat(bufferedAtWmFor(h, op, "k1")).isNull(); + assertThat(probeBufferForKey(h, op, "k1")).isEmpty(); + assertThat(ttlExpiryFor(h, op, "k1")).isNull(); + } + } + + @Test + void stateTtlRestoreResetsFlipProcTime() throws Exception { + // stateTtlMs = 50. Build write at t=0 arms TTL at 75. Flip at t=0 → flipProcTime=0. + // Original grace ends at 0+50=50. + LateralSnapshotJoinOperator op1 = newOperator(false, 100L, null, 50L); + OperatorSubtaskState state; + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op1)) { + h.setProcessingTime(0); + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 100L); + assertThat(op1.getFlipProcTime()).isEqualTo(0L); + state = h.snapshot(0L, 0L); + } + + // Restart at t=30 — flipProcTime is re-anchored to 30; new grace ends at 30+50=80. + LateralSnapshotJoinOperator op2 = newOperator(false, 100L, null, 50L); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op2)) { + h.setProcessingTime(30); + h.initializeState(state); + h.open(); + assertThat(op2.getFlipProcTime()).isEqualTo(30L); + + // At t=75 the recovered TTL timer fires. Grace check: now=75 < flipProcTime+stateTtlMs + // = 80 → reschedule rather than evict. + h.setProcessingTime(75); + + // k1 still present because the grace window was re-anchored. + assertThat(buildStateKeys(h)).containsExactly("k1"); + + // advance proc-time to 105 to evict state + h.setProcessingTime(105); + assertThat(buildStateKeys(h)).isEmpty(); + } + } + + // ----------------------------------------------------------------- Snapshot / restore + + @Test + void restoreFromLoadPhaseSnapshot() throws Exception { + LateralSnapshotJoinOperator op1 = newOperator(false, 100L, null, null); + OperatorSubtaskState state; + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op1)) { + h.open(); + // Build-side multi-set with a duplicate count, plus a buffered + // probe. + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k1", "v2", 2L)); + addBuildWm(h, 10L); + addBuildChange(h, insertRecord("k1", "v3", 3L)); + addProbeRecord(h, 1L, "k1", "p1"); + assertPhase(op1, Phase.LOAD); + assertThat(probeBufferKeys(h)).containsExactly("k1"); + state = h.snapshot(0L, 0L); + } + + LateralSnapshotJoinOperator op2 = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op2)) { + h.initializeState(state); + h.open(); + assertPhase(op2, Phase.LOAD); + // Buffered probe and build-table multi-set (with counts) preserved across restore. + assertThat(probeBufferKeys(h)).containsExactly("k1"); + assertThat(buildTableForKey(h, op2, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L)); + assertThat(bufferedChangesForKey(h, op2, "k1")).hasSize(1); + // Gauge is rebuilt from restored keyed state: one probe buffered for k1. + assertThat(op2.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(1L); + + // Trigger flip; the buffered probe is joined post-restore, count-respecting against the + // restored multi-set (2× v1 + 1× v2 = three rows). + addProbeWm(h, 50L); + addBuildWm(h, 100L); + assertPhase(op2, Phase.JOIN); + assertThat(probeBufferKeys(h)).isEmpty(); + // Draining the restored probe and build buffers brings both gauges back to 0. + assertThat(op2.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + assertThat(buildTableForKey(h, op2, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L, "v3", 1L)); + + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "p1", "k1", "v1", 1L), + row(1L, "k1", "p1", "k1", "v1", 1L), + row(1L, "k1", "p1", "k1", "v2", 2L), + row(1L, "k1", "p1", "k1", "v3", 3L)); + } + } + + @Test + void restoreFromMixedPhaseSnapshot() throws Exception { + // Subtask A: drive into JOIN with a buffered change for k1. + LateralSnapshotJoinOperator opA = newOperator(false, 100L, null, null); + OperatorSubtaskState stateA; + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(opA)) { + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 100L); + assertPhase(opA, Phase.JOIN); + // Buffer a build-side change + addBuildChange(h, insertRecord("k1", "v1", 1L)); + stateA = h.snapshot(0L, 0L); + } + + // Subtask B: stay in LOAD (no flip-triggering build WM). + LateralSnapshotJoinOperator opB = newOperator(false, 100L, null, null); + OperatorSubtaskState stateB; + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(opB)) { + h.open(); + addProbeRecord(h, 1L, "k2", "p1"); + assertPhase(opB, Phase.LOAD); + stateB = h.snapshot(0L, 0L); + } + + OperatorSubtaskState combined = + AbstractStreamOperatorTestHarness.repackageState(stateA, stateB); + + // Restore the combined state — phase must be LOAD because some subtask was LOAD. + LateralSnapshotJoinOperator opC = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(opC)) { + h.initializeState(combined); + h.open(); + assertPhase(opC, Phase.LOAD); + + // assert state: A's build table ({v1:1}) plus A's buffered change and B's probe. + assertThat(buildTableKeys(h)).containsExactly("k1"); + assertThat(buildTableForKey(h, opC, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L)); + assertThat(bufferedChangesForKey(h, opC, "k1")).hasSize(1); + assertThat(probeBufferKeys(h)).containsExactly("k2"); + + // During LOAD the current build WM is still MIN_VALUE, so the recovered buffer (tagged + // at the pre-restore WM) is not drained; the new change is appended to it. + addBuildChange(h, insertRecord("k1", "v2", 2L)); + assertThat(bufferedChangesForKey(h, opC, "k1")).hasSize(2); + assertThat(buildTableForKey(h, opC, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 1L)); + + // Trigger flip + addBuildWm(h, 100L); + assertPhase(opC, Phase.JOIN); + + // Access k1: the buffered changes drain in event-time order, then the probe joins. + addProbeRecord(h, 1L, "k1", "p1"); + assertThat(bufferedChangesForKey(h, opC, "k1")).isEmpty(); + assertThat(buildTableForKey(h, opC, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v1", 2L, "v2", 1L)); + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k1", "p1", "k1", "v1", 1L), + row(1L, "k1", "p1", "k1", "v1", 1L), + row(1L, "k1", "p1", "k1", "v2", 2L)); + } + } + + @Test + void restoreFromJoinPhaseSnapshot() throws Exception { + LateralSnapshotJoinOperator op1 = newOperator(false, 100L, null, null); + OperatorSubtaskState state; + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op1)) { + h.open(); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k2", "v1", 1L)); + addBuildWm(h, 100L); + assertPhase(op1, Phase.JOIN); + // Buffer a -U/+U pair for each key (tagged at bufferedAt = 100). + addBuildChange(h, updateBeforeRecord("k1", "v1", 1L)); + addBuildChange(h, updateAfterRecord("k1", "v2", 101L)); + addBuildChange(h, updateBeforeRecord("k2", "v1", 1L)); + addBuildChange(h, updateAfterRecord("k2", "v2", 101L)); + state = h.snapshot(0L, 0L); + } + + LateralSnapshotJoinOperator op2 = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op2)) { + h.initializeState(state); + h.open(); + assertPhase(op2, Phase.JOIN); + assertThat(op2.getCurrentBuildSideWm()).isEqualTo(Long.MIN_VALUE); + assertThat(bufferedChangesForKey(h, op2, "k1")).hasSize(2); + assertThat(bufferedChangesForKey(h, op2, "k2")).hasSize(2); + // Gauge is rebuilt from restored keyed state: four build-side changes buffered + // (two for k1, two for k2). + assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(4L); + + // k2: accessed while no build WM has arrived since restore → eager drain. + addProbeRecord(h, 1L, "k2", "p-k2"); + assertThat(bufferedChangesForKey(h, op2, "k2")).isEmpty(); + // Draining k2's two restored changes leaves k1's two still buffered. + assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(2L); + assertThat(buildTableForKey(h, op2, "k2")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v2", 1L)); + + // k1: still buffered (eager drain of k2 left latestBuildSideWm at MIN_VALUE); advance + // the build WM, then access → normal WM-gated drain. + assertThat(bufferedChangesForKey(h, op2, "k1")).hasSize(2); + addBuildWm(h, 200L); + addProbeRecord(h, 2L, "k1", "p-k1"); + assertThat(bufferedChangesForKey(h, op2, "k1")).isEmpty(); + // All restored changes drained; gauge back to 0. + assertThat(op2.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + assertThat(buildTableForKey(h, op2, "k1")) + .containsExactlyInAnyOrderEntriesOf(Map.of("v2", 1L)); + + stripWatermarksAndStatusesFromOutput(h); + JOINED_ASSERTOR.shouldEmitAll( + h, + row(1L, "k2", "p-k2", "k2", "v2", 101L), + row(2L, "k1", "p-k1", "k1", "v2", 101L)); + } + } + + @Test + void restoreRearmsIdleFlipTimer() throws Exception { + LateralSnapshotJoinOperator op1 = newOperator(false, 1000L, 100L, null); + OperatorSubtaskState state; + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op1)) { + h.setProcessingTime(50); + h.open(); + assertPhase(op1, Phase.LOAD); + state = h.snapshot(0L, 0L); + } + + LateralSnapshotJoinOperator op2 = newOperator(false, 1000L, 100L, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op2)) { + h.setProcessingTime(150); + h.initializeState(state); + h.open(); + assertPhase(op2, Phase.LOAD); + // Re-armed at open()'s proc-time + idleTimeout = 150 + 100 = 250. + h.setProcessingTime(249); + assertPhase(op2, Phase.LOAD); + h.setProcessingTime(250); + assertPhase(op2, Phase.JOIN); + } + } + + // ----------------------------------------------------------------- Metrics + + @Test + void currentPhaseGaugeReflectsPhase() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // LOAD = ordinal 0. + assertThat(op.getCurrentPhaseGauge().getValue()).isEqualTo(0); + + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + // JOIN = ordinal 1. + assertThat(op.getCurrentPhaseGauge().getValue()).isEqualTo(1); + } + } + + @Test + void probeBufferedGaugeTracksLoadBufferingAndDrainsOnFlip() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + + addProbeRecord(h, 1L, "k1", "p1"); + assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(1L); + addProbeRecord(h, 2L, "k2", "p2"); + assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(2L); + addProbeRecord(h, 3L, "k1", "p3"); + assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(3L); + + // Flip drains all buffered probes across keys. + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + assertThat(op.getNumProbeSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + } + } + + @Test + void buildBufferedGaugeTracksJoinBufferingAndDrains() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // Flip to JOIN with an empty build state (no build records during LOAD). + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + + // JOIN-phase build changes are buffered (their tag equals the current build WM). + addBuildChange(h, insertRecord("k1", "v2", 101L)); + addBuildChange(h, insertRecord("k1", "v3", 101L)); + assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(2L); + addBuildChange(h, insertRecord("k2", "v1", 102L)); + assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(3L); + + // Advance build WM, then drain each key by accessing it from the probe side. + addBuildWm(h, 110L); + addProbeRecord(h, 1L, "k1", "p1"); + assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(1L); + addProbeRecord(h, 2L, "k2", "p2"); + assertThat(op.getNumBuildSideRecordsBufferedGauge().getValue()).isEqualTo(0L); + } + } + + @Test + void watermarkGaugesTrackBuildAndProbeWatermarks() throws Exception { + // High loadCompletedTime so the operator stays in LOAD for the first build WMs. + LateralSnapshotJoinOperator op = newOperator(false, 1000L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(Long.MIN_VALUE); + assertThat(op.getCurrentProbeSideWatermarkGauge().getValue()).isEqualTo(Long.MIN_VALUE); + + // LOAD phase: both watermarks are tracked even though nothing is forwarded. + addBuildWm(h, 50L); + assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(50L); + addProbeWm(h, 70L); + assertThat(op.getCurrentProbeSideWatermarkGauge().getValue()).isEqualTo(70L); + assertPhase(op, Phase.LOAD); + + // Flip to JOIN. + addBuildWm(h, 1000L); + assertPhase(op, Phase.JOIN); + assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(1000L); + + // JOIN phase: probe WM gauge keeps tracking (guards the dedicated currentProbeWm + // field). + addProbeWm(h, 1500L); + assertThat(op.getCurrentProbeSideWatermarkGauge().getValue()).isEqualTo(1500L); + addBuildWm(h, 1100L); + assertThat(op.getCurrentBuildSideWatermarkGauge().getValue()).isEqualTo(1100L); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void fanOutGaugesTrackMaxAndAverage(boolean leftOuter) throws Exception { + LateralSnapshotJoinOperator op = newOperator(leftOuter, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // k1 multi-set: v1 x2, v2 x1 → fan-out 3; k2 single row → fan-out 1. + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k1", "v1", 1L)); + addBuildChange(h, insertRecord("k1", "v2", 1L)); + addBuildChange(h, insertRecord("k2", "v1", 1L)); + // a probe record during load + addProbeRecord(h, 1L, "k2", "p1"); // fan-out 1 + // a probe record that does not match + addProbeRecord(h, 2L, "k3", "p2"); // fan-out INNER: 0, LEFT OUTER: 1 + addBuildWm(h, 100L); + assertPhase(op, Phase.JOIN); + + // check merge stats after transition + assertThat(op.getMaxJoinFanOutGauge().getValue()).isEqualTo(1L); + if (leftOuter) { + assertThat(op.getAvgJoinFanOutGauge().getValue()).isEqualTo(2d / 2); + assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(0L); + } else { + assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(1d / 2, within(1e-9)); + assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(1L); + } + + // join probe in JOIN phase + addProbeRecord(h, 3L, "k1", "p3"); // fan-out 3 + assertThat(op.getMaxJoinFanOutGauge().getValue()).isEqualTo(3L); + if (leftOuter) { + assertThat(op.getAvgJoinFanOutGauge().getValue()).isEqualTo(5d / 3, within(1e-9)); + assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(0L); + } else { + assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(4d / 3, within(1e-9)); + assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(1L); + } + + addProbeRecord(h, 3L, "k3", "no-match"); + assertThat(op.getMaxJoinFanOutGauge().getValue()).isEqualTo(3L); + if (leftOuter) { + // fan-out 1 (LEFT OUTER, null-padded) + assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(6.0d / 4, within(1e-9)); + assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(0L); + } else { + // fan-out 0 (INNER, unmatched) + assertThat(op.getAvgJoinFanOutGauge().getValue()).isCloseTo(4.0d / 4, within(1e-9)); + assertThat(op.getNumUnmatchedProbeRecords().getCount()).isEqualTo(2L); + } + } + } + + @Test + void numUnmatchedBuildRetractionsCountsAbsentRowRetractions() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // LOAD: a -D for a never-inserted row is counted. + addBuildChange(h, deleteRecord("k1", "ghost", 1L)); + addBuildChange(h, insertRecord("k1", "v1", 2L)); + assertThat(op.getNumUnmatchedBuildRetractions().getCount()).isEqualTo(0L); Review Comment: Two tests suggestions: - **ref-count underflow**: right now only the metric is asserted - could we also check the multiset map itself, e.g. that a `-D` on a count-1 row removes the key rather than going negative? - **atomic `-U`/`+U` visibility mid-buffer**: it'd be great to have a case where a probe joins *while* a `-U`/`+U` pair is still buffered (should see the old value), then re-probes after the watermark advances (sees the new value) - that's really the whole point of the buffer, so worth pinning down -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
