fhueske commented on code in PR #28329: URL: https://github.com/apache/flink/pull/28329#discussion_r3453764499
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java: ########## @@ -0,0 +1,1038 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.snapshot; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.DefaultOpenContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.metrics.SimpleGauge; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +/** + * Stream operator implementing the {@code LATERAL SNAPSHOT} processing-time temporal table join. + * + * <p>The operator runs in two operator-wide phases that progress only forward, LOAD then JOIN: LOAD + * bootstraps the build-side table from its changelog up to {@code loadCompletedTime}, buffering + * probe rows meanwhile; JOIN then continuously joins probe rows against the materialized build-side + * state. How each input is handled depends on the current phase: + * + * <ul> + * <li>Build-side (input2 / right) changes are handled the same way in both phases: they are + * buffered in {@code buildChangeBuffer} and applied lazily to a per-key multi-set in {@code + * buildTableState} on the next per-key access (build- or probe-side) once the build-side + * watermark has advanced past the buffer's tag, or at the flip. The buffered changelog is + * applied in event-time order: changes are sorted by the build-side row-time attribute + * ({@code buildRowtimeIndex}), and for equal row-times retractions ({@code -U}/{@code -D}) + * are applied before accumulations ({@code +U}/{@code +I}). Buffering until the watermark + * passes preserves atomic update visibility across {@code -U}/{@code +U} pairs in JOIN phase. + * <li>Probe-side (input1 / left) records are handled differently per phase. During LOAD they are + * buffered in {@code probeBuffer} until the configured flip point is reached on the + * build-side watermark, at which point a per-key event-time timer drains the buffered probes + * and joins them with the materialized build-side state. During JOIN they are joined + * immediately with the current build-side state. + * </ul> + * + * <p>Watermark forwarding rules: + * + * <ul> + * <li>Build-side watermarks are never forwarded downstream. + * <li>Probe-side watermarks are held back during LOAD and forwarded during JOIN phase. + * </ul> + * + * <p>The flip from LOAD to JOIN phase is triggered by either: + * + * <ul> + * <li>the build-side watermark reaching {@code loadCompletedTime} (event-time gate), or + * <li>the {@code loadCompletedIdleTimeoutMs} processing-time timer firing without any build-side + * watermark advance. + * </ul> + * + * <p>State TTL eviction happens during JOIN phase and is implemented with keyed processing-time + * timers (matching the semantics of Flink's standard {@code StateTtlConfig}). + */ +@Internal +public class LateralSnapshotJoinOperator extends AbstractStreamOperator<RowData> + implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, String> { + + // -------------------------- static final definitions -------------------------- + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(LateralSnapshotJoinOperator.class); + + /** operator state names */ + private static final String OPERATOR_PHASE_STATE_NAME = "lateral-snapshot-phase"; + + /** keyed state names */ + @VisibleForTesting static final String BUILD_TABLE_STATE_NAME = "build-table"; + + @VisibleForTesting static final String BUILD_CHANGE_BUFFER_STATE_NAME = "build-change-buffer"; + @VisibleForTesting static final String BUFFERED_AT_WM_STATE_NAME = "buffered-at-wm"; + @VisibleForTesting static final String PROBE_BUFFER_STATE_NAME = "probe-buffer"; + private static final String TTL_EXPIRY_STATE_NAME = "ttl-expiry"; + + /** timer service and namespace names */ + private static final String TIMER_SERVICE_NAME = "lateral-snapshot-timers"; + + @VisibleForTesting static final String NS_FLIP = "flip"; + @VisibleForTesting static final String NS_TTL = "ttl"; + + /** + * Event-time timestamp at which the per-key {@code probeBuffer} flip join timer is registered. + * Any non-{@code MIN_VALUE} watermark advance fires it. + */ + @VisibleForTesting static final long FLIP_JOIN_TIMER_TS = 1L; + + /** Gauge: probe-side records currently buffered during LOAD. */ + @VisibleForTesting static final String M_NUM_PROBE_BUFFERED = "numProbeSideRecordsBuffered"; + + /** Gauge: build-side changes currently buffered during JOIN. */ + @VisibleForTesting static final String M_NUM_BUILD_BUFFERED = "numBuildSideRecordsBuffered"; + + /** Counter: keys evicted from state by TTL. */ + @VisibleForTesting static final String M_NUM_STATE_TTL_EVICTIONS = "numStateTtlEvictions"; + + /** Gauge: latest build-side watermark observed. */ + @VisibleForTesting static final String M_CURRENT_BUILD_WM = "currentBuildSideWatermark"; + + /** Gauge: latest probe-side watermark observed. */ + @VisibleForTesting static final String M_CURRENT_PROBE_WM = "currentProbeSideWatermark"; + + /** Gauge: current phase ordinal, 0 = LOAD, 1 = JOIN. */ + @VisibleForTesting static final String M_CURRENT_PHASE = "currentPhase"; + + /** Gauge: max joined records emitted for a probe-side record. */ + @VisibleForTesting static final String M_MAX_JOIN_FAN_OUT = "maxJoinFanOut"; + + /** Gauge: average joined records emitted per probe-side record. */ + @VisibleForTesting static final String M_AVG_JOIN_FAN_OUT = "avgJoinFanOut"; + + /** Counter: probe-side records without a match. */ + @VisibleForTesting static final String M_NUM_UNMATCHED_PROBE = "numUnmatchedProbeRecords"; + + /** Counter: build-side retractions for a row not present in state. */ + @VisibleForTesting + static final String M_NUM_UNMATCHED_BUILD_RETRACTIONS = "numUnmatchedBuildRetractions"; + + /** Two-phase state machine. */ + public enum Phase { + LOAD, + JOIN + } + + /** What triggered the {@link Phase#LOAD} to {@link Phase#JOIN} flip (for logging). */ + private enum FlipTrigger { + BUILD_WATERMARK, + IDLE_TIMEOUT + } + + // -------------------------- constructor args -------------------------- + + private final boolean isLeftOuterJoin; + private final InternalTypeInfo<RowData> leftType; + private final InternalTypeInfo<RowData> rightType; + + /** Field index of the build-side (right) row-time attribute. */ + private final int buildRowtimeIndex; + + private final GeneratedJoinCondition generatedJoinCondition; + + /** + * Per-equi-key flag indicating whether rows with a NULL in that key position must be filtered + * before the join condition runs (SQL semantics: {@code NULL = NULL} is not true). + */ + private final boolean[] filterNullKeys; + + /** + * Timestamp at which the build-side watermark must arrive for the operator to flip from {@code + * LOAD} to JOIN. + */ + private final Long loadCompletedTime; + + /** + * Processing-time idle timeout duration (millis) on build-side watermarks. When configured, the + * operator flips to JOIN if no build-side watermark advance is seen for this duration. + */ + @Nullable private final Long loadCompletedIdleTimeoutMs; + + /** + * State TTL (millis) to clean up any keyed state during JOIN phase. We schedule TTL timers + * maxStateTtlMs ahead and check on minStateTtlMs before scheduling a new timer. This avoids + * rescheduling timers on every state access while still ensuring that keyed state is evicted + * after at most maxStateTtlMs of key inactivity during JOIN phase. If minStateTtlMs is set to + * 0, state TTL is disabled. + */ + private final long minStateTtlMs; + + private final long maxStateTtlMs; + + // -------------------------- transient runtime fields -------------------------- + + private transient JoinConditionWithNullFilters joinCondition; + private transient GenericRowData nullPaddedBuild; + private transient TimestampedCollector<RowData> collector; + + private transient InternalTimerService<String> timerService; + + private transient Phase phase; + + /** + * Processing-time wall clock at which the operator transitioned from {@link Phase#LOAD} to + * {@link Phase#JOIN}. {@code null} while still in LOAD. Used by the TTL handler to reschedule + * state TTL timers that fire too early. + */ + @Nullable private transient Long flipProcTime; + + /** Highest build-side watermark observed. */ + private transient long currentBuildSideWm; + + /** Latest probe-side watermark observed. */ + private transient long currentProbeSideWm; + + /** Non-keyed processing-time idle-flip timer. */ + @Nullable private transient ScheduledFuture<?> idleFlipTimer; + + // -------------------------- keyed state -------------------------- + + /** Build-side table as multi-set: row → reference count. */ + private transient MapState<RowData, Long> buildTableState; + + /** Buffer for build-side changes during JOIN to ensure atomic updates. */ + private transient ListState<RowData> buildChangeBuffer; + + /** Build-side watermark to ensure atomic application of build changes during JOIN. */ + private transient ValueState<Long> bufferedAtWmState; + + /** Buffer for probe-side records during LOAD. */ + private transient ListState<RowData> probeBuffer; + + /** Most recently registered TTL timer deadline; used to advance TTL timer. */ + private transient ValueState<Long> ttlExpiryState; + + // -------------------------- operator state -------------------------- + + private transient ListState<String> operatorPhaseState; + + // -------------------------- metrics -------------------------- + + private transient Counter numStateTtlEvictions; + private transient Counter numUnmatchedProbeRecords; + private transient Counter numUnmatchedBuildRetractions; + + private transient SimpleGauge<Long> probeBufferedGauge; + private transient SimpleGauge<Long> buildBufferedGauge; + private transient SimpleGauge<Long> buildWmGauge; + private transient SimpleGauge<Long> probeWmGauge; + private transient SimpleGauge<Long> maxFanOutGauge; + private transient SimpleGauge<Double> avgFanOutGauge; + private transient Gauge<Integer> phaseGauge; + + /** Backing accumulators for the push-model gauges (in-memory, not persisted, best-effort). */ + private transient long probeBufferedCount; + + private transient long buildBufferedCount; + private transient long maxJoinFanOut; + private transient long totalJoinFanOut; + private transient long totalProbeJoins; + + public LateralSnapshotJoinOperator( + boolean isLeftOuterJoin, + InternalTypeInfo<RowData> leftType, + InternalTypeInfo<RowData> rightType, + int buildRowtimeIndex, + GeneratedJoinCondition generatedJoinCondition, + boolean[] filterNullKeys, + Long loadCompletedTime, + @Nullable Long loadCompletedIdleTimeoutMs, + @Nullable Long stateTtlMs) { + this.isLeftOuterJoin = isLeftOuterJoin; + this.leftType = Preconditions.checkNotNull(leftType); + this.rightType = Preconditions.checkNotNull(rightType); + this.buildRowtimeIndex = buildRowtimeIndex; + if (buildRowtimeIndex < 0) { + throw new IllegalArgumentException("buildRowtimeIndex must be non-negative"); + } + this.generatedJoinCondition = Preconditions.checkNotNull(generatedJoinCondition); + this.filterNullKeys = Preconditions.checkNotNull(filterNullKeys); + this.loadCompletedTime = Preconditions.checkNotNull(loadCompletedTime); + if (this.loadCompletedTime < 0) { + throw new IllegalArgumentException("loadCompletedTime must be non-negative"); + } + this.loadCompletedIdleTimeoutMs = loadCompletedIdleTimeoutMs; + if (this.loadCompletedIdleTimeoutMs != null && this.loadCompletedIdleTimeoutMs < 0) { + throw new IllegalArgumentException("loadCompletedIdleTimeoutMs must be non-negative"); + } + this.minStateTtlMs = stateTtlMs == null ? 0 : stateTtlMs; + if (this.minStateTtlMs < 0) { + throw new IllegalArgumentException("stateTtlMs must be non-negative"); + } + // maxStateTtlMs is 1.5x of minStateTtlMs + this.maxStateTtlMs = this.minStateTtlMs + this.minStateTtlMs / 2; + } + + // -------------------------- lifecycle -------------------------- + + @Override + public boolean useInterruptibleTimers(ReadableConfig config) { + return true; + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + // Operator state only — keyed state and timer services are initialized in open() + operatorPhaseState = + context.getOperatorStateStore() + .getUnionListState( + new ListStateDescriptor<>( + OPERATOR_PHASE_STATE_NAME, StringSerializer.INSTANCE)); + + // any LOAD entry → LOAD; empty (fresh start) → LOAD; else JOIN + boolean phaseStateExists = false; + boolean anyTaskInLoad = false; + for (String phase : operatorPhaseState.get()) { + phaseStateExists = true; + if (Phase.LOAD.name().equals(phase)) { + anyTaskInLoad = true; + break; + } + } + // we start in LOAD phase if no phaseState exists (no savepoint/checkpoint) or any task was + // still in LOAD phase (not all tasks transitioned to JOIN phase). + phase = (!phaseStateExists || anyTaskInLoad) ? Phase.LOAD : Phase.JOIN; + + // When restored into JOIN, anchor flipProcTime on the current wall clock so the TTL + // handler's post-flip grace window restarts from now. + flipProcTime = + phase == Phase.JOIN ? getProcessingTimeService().getCurrentProcessingTime() : null; + + currentBuildSideWm = Long.MIN_VALUE; + currentProbeSideWm = Long.MIN_VALUE; + + // Initialize metric counters + probeBufferedCount = 0L; + buildBufferedCount = 0L; + maxJoinFanOut = 0L; + totalJoinFanOut = 0L; + totalProbeJoins = 0L; + } + + @Override + public void open() throws Exception { + super.open(); + + // Setup keyed states + buildTableState = + getRuntimeContext() + .getMapState( + new MapStateDescriptor<>( + BUILD_TABLE_STATE_NAME, rightType, Types.LONG)); + buildChangeBuffer = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + BUILD_CHANGE_BUFFER_STATE_NAME, rightType)); + bufferedAtWmState = + getRuntimeContext() + .getState( + new ValueStateDescriptor<>(BUFFERED_AT_WM_STATE_NAME, Types.LONG)); + probeBuffer = + getRuntimeContext() + .getListState(new ListStateDescriptor<>(PROBE_BUFFER_STATE_NAME, leftType)); + ttlExpiryState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>(TTL_EXPIRY_STATE_NAME, Types.LONG)); + + // Setup timerservice + timerService = getInternalTimerService(TIMER_SERVICE_NAME, StringSerializer.INSTANCE, this); + + // Wrap the codegen'd condition with a null-key filter so SQL semantics are honored for + // equi-keys whose values may be NULL. + final JoinCondition rawCondition = + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); + joinCondition = new JoinConditionWithNullFilters(rawCondition, filterNullKeys, this); + joinCondition.setRuntimeContext(getRuntimeContext()); + joinCondition.open(DefaultOpenContext.INSTANCE); + + // Construct null-padded record for left-outer join + nullPaddedBuild = + isLeftOuterJoin ? new GenericRowData(rightType.toRowType().getFieldCount()) : null; + + // Create output collector + collector = new TimestampedCollector<>(output); + + timerService = getInternalTimerService(TIMER_SERVICE_NAME, StringSerializer.INSTANCE, this); + + // Register metrics + final MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + numStateTtlEvictions = metricGroup.counter(M_NUM_STATE_TTL_EVICTIONS); + numUnmatchedProbeRecords = metricGroup.counter(M_NUM_UNMATCHED_PROBE); + numUnmatchedBuildRetractions = metricGroup.counter(M_NUM_UNMATCHED_BUILD_RETRACTIONS); + + // Rebuild the buffer counts from restored keyed state so the gauges reflect the + // records buffered in this subtask after a restore/rescale. + recomputeBufferCountsFromState(); + + probeBufferedGauge = new SimpleGauge<>(probeBufferedCount); + buildBufferedGauge = new SimpleGauge<>(buildBufferedCount); + buildWmGauge = new SimpleGauge<>(currentBuildSideWm); + probeWmGauge = new SimpleGauge<>(currentProbeSideWm); + maxFanOutGauge = new SimpleGauge<>(maxJoinFanOut); + avgFanOutGauge = new SimpleGauge<>(0.0d); + metricGroup.gauge(M_NUM_PROBE_BUFFERED, probeBufferedGauge); + metricGroup.gauge(M_NUM_BUILD_BUFFERED, buildBufferedGauge); + metricGroup.gauge(M_CURRENT_BUILD_WM, buildWmGauge); + metricGroup.gauge(M_CURRENT_PROBE_WM, probeWmGauge); + metricGroup.gauge(M_MAX_JOIN_FAN_OUT, maxFanOutGauge); + metricGroup.gauge(M_AVG_JOIN_FAN_OUT, avgFanOutGauge); + phaseGauge = () -> phase == null ? -1 : phase.ordinal(); + metricGroup.gauge(M_CURRENT_PHASE, phaseGauge); + + // Mark the build-side input (index 1) as permanently idle in the inherited + // combinedWatermark accounting. This operator never forwards build-side WMs nor + // build-side idle status: it absorbs both. + combinedWatermark.updateStatus(1, true); + + // Register the load-completed idle-timeout timer if it is configured. + if (phase == Phase.LOAD && loadCompletedIdleTimeoutMs != null) { + scheduleIdleFlipTimer(); + } + + LOG.info( + "Opened LateralSnapshotJoinOperator: phase={}, leftOuter={}, loadCompletedTime={}, " + + "idleTimeoutMs={}, stateTtlMs=[{}, {}], buildRowtimeIndex={}, " + + "restoredProbeBuffered={}, restoredBuildBuffered={}", + phase, + isLeftOuterJoin, + loadCompletedTime, + loadCompletedIdleTimeoutMs, + minStateTtlMs, + maxStateTtlMs, + buildRowtimeIndex, + probeBufferedCount, + buildBufferedCount); + } + + /** + * Recomputes the in-memory buffer counter ({@link #probeBufferedCount}, {@link + * #buildBufferedCount}) for the corresponding metrics by scanning the restored keyed buffer + * state. + */ + private void recomputeBufferCountsFromState() throws Exception { + final KeyedStateBackend<Object> backend = getKeyedStateBackend(); + + LOG.info("Start recomputing buffered-record metrics from restored keyed state..."); + + probeBufferedCount = 0L; + backend.applyToAllKeys( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ListStateDescriptor<>(PROBE_BUFFER_STATE_NAME, leftType), + (key, state) -> { + for (RowData ignored : state.get()) { + probeBufferedCount++; + } + }); + + buildBufferedCount = 0L; + backend.applyToAllKeys( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ListStateDescriptor<>(BUILD_CHANGE_BUFFER_STATE_NAME, rightType), + (key, state) -> { + for (RowData ignored : state.get()) { + buildBufferedCount++; + } + }); + + LOG.info( + "Finished recomputing buffered-record metrics from restored keyed state: " + + "probeBuffered={}, buildBuffered={}", + probeBufferedCount, + buildBufferedCount); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + operatorPhaseState.update(List.of(phase.name())); + } + + @Override + public void close() throws Exception { + if (idleFlipTimer != null) { + idleFlipTimer.cancel(false); + idleFlipTimer = null; + } + if (joinCondition != null) { + joinCondition.close(); + } + super.close(); + } + + // -------------------------- input row processing -------------------------- + + @Override + public void processElement1(StreamRecord<RowData> element) throws Exception { + RowData probe = element.getValue(); + // Apply any buffered build-side changes if the build-side watermark has advanced. + applyBufferedChangesIfReady(); + if (phase == Phase.LOAD) { + probeBuffer.add(probe); + timerService.registerEventTimeTimer(NS_FLIP, FLIP_JOIN_TIMER_TS); + probeBufferedGauge.update(++probeBufferedCount); + } else { + joinProbeRow(probe); + } + refreshStateTtl(); + } + + @Override + public void processElement2(StreamRecord<RowData> element) throws Exception { + RowData build = element.getValue(); + // Apply any buffered build-side changes if the build-side watermark has advanced. + Long bufferedAt = applyBufferedChangesIfReady(); + // Buffer the new change, tagged with the current build-side watermark. + buildChangeBuffer.add(build); + if (bufferedAt == null || bufferedAt != currentBuildSideWm) { + bufferedAtWmState.update(currentBuildSideWm); + } + buildBufferedGauge.update(++buildBufferedCount); + refreshStateTtl(); + } + + // -------------------------- watermark processing -------------------------- + + @Override + public void processWatermark1(Watermark mark) throws Exception { + // Probe-side watermark: held back during LOAD (neither advances the timer service nor is + // forwarded), forwarded during JOIN. + if (phase == Phase.JOIN) { + super.processWatermark1(mark); + } + currentProbeSideWm = Math.max(currentProbeSideWm, mark.getTimestamp()); + probeWmGauge.update(currentProbeSideWm); + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + // Build-side watermark: NEVER forwarded; never advances the timer service. + long ts = mark.getTimestamp(); + currentBuildSideWm = Math.max(currentBuildSideWm, ts); + buildWmGauge.update(currentBuildSideWm); + if (phase == Phase.LOAD) { + if (currentBuildSideWm >= loadCompletedTime) { + // we reached the flip point. Transition to JOIN phase. + transitionToJoinPhase(FlipTrigger.BUILD_WATERMARK); + } else if (loadCompletedIdleTimeoutMs != null) { + // we got a new build-side wm. Reschedule the idle timer (if it was configured) + rescheduleIdleFlipTimer(); + } + } + } + + @Override + protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index) + throws Exception { + if (index == 1) { + // Build-side idle status is absorbed entirely. partial[1] is initialized idle in + // open() and stays that way regardless of source-side toggles, so combined accounting + // is always driven by the probe side alone. + return; + } + if (phase == Phase.LOAD) { + // During LOAD, nothing is emitted downstream — neither watermarks nor status. But we + // do track partial[0]'s idle bit so that, after the flip, the operator has an accurate + // view of the probe-side's idle state. + combinedWatermark.updateStatus(0, watermarkStatus.isIdle()); + return; + } + super.processWatermarkStatus(watermarkStatus, index); + } + + // -------------------------- timers processing -------------------------- + + @Override + public void onEventTime(InternalTimer<RowData, String> timer) throws Exception { + String ns = timer.getNamespace(); + // the NS_FLIP timers are fired when the operator transitions from LOAD to JOIN phase. + if (NS_FLIP.equals(ns)) { + // If a recovery happened before, there might be buffered build-side changes. + // Apply them before joining the buffered probe-side records. + applyBufferedChanges(); + // Join each buffered probe row. + long drained = 0; + for (RowData p : probeBuffer.get()) { + joinProbeRow(p); + drained++; + } + probeBuffer.clear(); + probeBufferedCount = Math.max(0, probeBufferedCount - drained); + probeBufferedGauge.update(probeBufferedCount); + } + } + + @Override + public void onProcessingTime(InternalTimer<RowData, String> timer) throws Exception { + // TTL timers run on processing time so semantics match Flink's standard StateTtlConfig. + if (!NS_TTL.equals(timer.getNamespace())) { + return; + } + if (minStateTtlMs == 0) { + // TTL wasn't configured and shouldn't have registered any timers. + return; + } + Long deadline = ttlExpiryState.value(); + if (deadline == null || timer.getTimestamp() != deadline) { + return; // stale timer fire + } + long now = getProcessingTimeService().getCurrentProcessingTime(); + // check if we need to reschedule the ttl timer. This is necessary if + // a) we're still in LOAD phase, or + // b) if we're in JOIN but the flip happened less than stateTtlMs ago. + if (phase == Phase.LOAD || (flipProcTime != null && now < flipProcTime + minStateTtlMs)) { + // set the new TTL timer maxStateTtlMs ahead + long newDeadline = + phase == Phase.LOAD ? now + maxStateTtlMs : flipProcTime + maxStateTtlMs; + timerService.registerProcessingTimeTimer(NS_TTL, newDeadline); + ttlExpiryState.update(newDeadline); + return; + } + // clear all per-key state + buildTableState.clear(); + // Discount any build-side changes still buffered for this key before clearing. + long evictedBuffered = 0; + for (RowData ignored : buildChangeBuffer.get()) { + evictedBuffered++; + } + buildBufferedCount = Math.max(0, buildBufferedCount - evictedBuffered); + buildBufferedGauge.update(buildBufferedCount); + buildChangeBuffer.clear(); + bufferedAtWmState.clear(); + ttlExpiryState.clear(); + // probeBuffer should be empty because we are in JOIN phase, but clear out just in case. + // hence, we're also not updating the probeBufferedGauge. + probeBuffer.clear(); + numStateTtlEvictions.inc(); + } + + /** + * Registers the load-completion idle-timeout timer. No-op when the timeout is not configured. + */ + private void scheduleIdleFlipTimer() { + if (loadCompletedIdleTimeoutMs == null) { + return; + } + long deadline = + getProcessingTimeService().getCurrentProcessingTime() + loadCompletedIdleTimeoutMs; + idleFlipTimer = + getProcessingTimeService() + .registerTimer( + deadline, t -> transitionToJoinPhase(FlipTrigger.IDLE_TIMEOUT)); + } + + /** Updates the idle flip timer. */ + private void rescheduleIdleFlipTimer() { + cancelIdleFlipTimer(); + scheduleIdleFlipTimer(); + } + + /** Deactivates the currently registered idle flip timer. */ + private void cancelIdleFlipTimer() { + if (idleFlipTimer != null) { + idleFlipTimer.cancel(false); + idleFlipTimer = null; + } + } + + // -------------------------- core logic -------------------------- + + /** + * Transition from LOAD to JOIN. + * + * <p><b>Invocation context</b>: This method runs in a NON-KEYED context. The two callers are + * (a) {@link #processWatermark2}, which is invoked by the framework without a key context, and + * (b) {@link #idleFlipTimer}, which fires from the operator-level processing-time service. + * Therefore {@code flipToJoinPhase()} itself must not access keyed state. Per-key work (the + * buffered probe flush) is delegated to {@link #onEventTime} via {@code timeServiceManager + * .advanceWatermark(...)} below — that path establishes the correct key context for each fired + * timer before invoking the callback. + */ + private void transitionToJoinPhase(FlipTrigger trigger) throws Exception { + if (phase == Phase.JOIN) { + return; + } + LOG.info( + "Flipping phase LOAD -> JOIN (trigger={}): buildWm={}, loadCompletedTime={}. " + + "Joining buffered probe records...", + trigger, + currentBuildSideWm, + loadCompletedTime); + phase = Phase.JOIN; + // Record the flip wall-clock so the TTL handler can grant a grace period of + // stateTtlMs after the flip before any build-only key becomes eligible for eviction. + // Without this anchor, keys loaded long before the flip would be evicted as soon as the + // first TTL fire after the flip happens. + flipProcTime = getProcessingTimeService().getCurrentProcessingTime(); + // disable idle flip timer + cancelIdleFlipTimer(); + // Fire all per-key flip timers (TS=1) so any probes buffered during LOAD are joined. + long advanceTo = Math.max(currentProbeSideWm, FLIP_JOIN_TIMER_TS); + if (timeServiceManager != null) { + timeServiceManager.advanceWatermark(new Watermark(advanceTo)); + } + // Track the idle status which is cleared by updateWatermark() + boolean probeIdleAtFlip = combinedWatermark.isIdle(); + // Emit the last observed probe-side wm downstream + if (currentProbeSideWm != Long.MIN_VALUE) { + combinedWatermark.updateWatermark(0, currentProbeSideWm); + output.emitWatermark(new Watermark(currentProbeSideWm)); + // reset the idle status + combinedWatermark.updateStatus(0, probeIdleAtFlip); + } + // If the probe-side was idle at flip time, propagate the idle status downstream + if (probeIdleAtFlip) { + output.emitWatermarkStatus(WatermarkStatus.IDLE); + } + LOG.info( + "Completed flip to JOIN phase (trigger={}): buffered probe records joined and " + + "drained, emittedProbeWm={}", + trigger, + currentProbeSideWm == Long.MIN_VALUE ? "none" : currentProbeSideWm); + } + + /** + * Joins a probe-side row against the current build-side table and applies the join predicate. + * Returns a null-padded result if the row doesn't match any build-side row and this is a LEFT + * OUTER join. + */ + private void joinProbeRow(RowData probe) throws Exception { + boolean matched = false; + // Number of result rows emitted for this probe row (the join fan-out). + long fanOut = 0; + for (Map.Entry<RowData, Long> entry : buildTableState.entries()) { + RowData buildRow = entry.getKey(); + long count = entry.getValue(); + if (joinCondition.apply(probe, buildRow)) { Review Comment: So far, I've kept the operator generic to not blowup the complexity even more. In case of a PK-FK join, the key columns should be part of the key, so that should mean that `buildTableState` should just have a single row. Or am I misunderstanding, what you are proposing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
