gustavodemorais commented on code in PR #28329: URL: https://github.com/apache/flink/pull/28329#discussion_r3436446730
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java: ########## @@ -0,0 +1,1038 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.snapshot; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.DefaultOpenContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.metrics.SimpleGauge; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +/** + * Stream operator implementing the {@code LATERAL SNAPSHOT} processing-time temporal table join. + * + * <p>The operator runs in two operator-wide phases that progress only forward, LOAD then JOIN: LOAD + * bootstraps the build-side table from its changelog up to {@code loadCompletedTime}, buffering + * probe rows meanwhile; JOIN then continuously joins probe rows against the materialized build-side + * state. How each input is handled depends on the current phase: + * + * <ul> + * <li>Build-side (input2 / right) changes are handled the same way in both phases: they are + * buffered in {@code buildChangeBuffer} and applied lazily to a per-key multi-set in {@code + * buildTableState} on the next per-key access (build- or probe-side) once the build-side + * watermark has advanced past the buffer's tag, or at the flip. The buffered changelog is + * applied in event-time order: changes are sorted by the build-side row-time attribute + * ({@code buildRowtimeIndex}), and for equal row-times retractions ({@code -U}/{@code -D}) + * are applied before accumulations ({@code +U}/{@code +I}). Buffering until the watermark + * passes preserves atomic update visibility across {@code -U}/{@code +U} pairs in JOIN phase. + * <li>Probe-side (input1 / left) records are handled differently per phase. During LOAD they are + * buffered in {@code probeBuffer} until the configured flip point is reached on the + * build-side watermark, at which point a per-key event-time timer drains the buffered probes + * and joins them with the materialized build-side state. During JOIN they are joined + * immediately with the current build-side state. + * </ul> + * + * <p>Watermark forwarding rules: + * + * <ul> + * <li>Build-side watermarks are never forwarded downstream. + * <li>Probe-side watermarks are held back during LOAD and forwarded during JOIN phase. + * </ul> + * + * <p>The flip from LOAD to JOIN phase is triggered by either: + * + * <ul> + * <li>the build-side watermark reaching {@code loadCompletedTime} (event-time gate), or + * <li>the {@code loadCompletedIdleTimeoutMs} processing-time timer firing without any build-side + * watermark advance. + * </ul> + * + * <p>State TTL eviction happens during JOIN phase and is implemented with keyed processing-time + * timers (matching the semantics of Flink's standard {@code StateTtlConfig}). + */ +@Internal +public class LateralSnapshotJoinOperator extends AbstractStreamOperator<RowData> + implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, String> { + + // -------------------------- static final definitions -------------------------- + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(LateralSnapshotJoinOperator.class); + + /** operator state names */ + private static final String OPERATOR_PHASE_STATE_NAME = "lateral-snapshot-phase"; + + /** keyed state names */ + @VisibleForTesting static final String BUILD_TABLE_STATE_NAME = "build-table"; + + @VisibleForTesting static final String BUILD_CHANGE_BUFFER_STATE_NAME = "build-change-buffer"; + @VisibleForTesting static final String BUFFERED_AT_WM_STATE_NAME = "buffered-at-wm"; + @VisibleForTesting static final String PROBE_BUFFER_STATE_NAME = "probe-buffer"; + private static final String TTL_EXPIRY_STATE_NAME = "ttl-expiry"; + + /** timer service and namespace names */ + private static final String TIMER_SERVICE_NAME = "lateral-snapshot-timers"; + + @VisibleForTesting static final String NS_FLIP = "flip"; + @VisibleForTesting static final String NS_TTL = "ttl"; + + /** + * Event-time timestamp at which the per-key {@code probeBuffer} flip join timer is registered. + * Any non-{@code MIN_VALUE} watermark advance fires it. + */ + @VisibleForTesting static final long FLIP_JOIN_TIMER_TS = 1L; + + /** Gauge: probe-side records currently buffered during LOAD. */ + @VisibleForTesting static final String M_NUM_PROBE_BUFFERED = "numProbeSideRecordsBuffered"; + + /** Gauge: build-side changes currently buffered during JOIN. */ + @VisibleForTesting static final String M_NUM_BUILD_BUFFERED = "numBuildSideRecordsBuffered"; + + /** Counter: keys evicted from state by TTL. */ + @VisibleForTesting static final String M_NUM_STATE_TTL_EVICTIONS = "numStateTtlEvictions"; + + /** Gauge: latest build-side watermark observed. */ + @VisibleForTesting static final String M_CURRENT_BUILD_WM = "currentBuildSideWatermark"; + + /** Gauge: latest probe-side watermark observed. */ + @VisibleForTesting static final String M_CURRENT_PROBE_WM = "currentProbeSideWatermark"; + + /** Gauge: current phase ordinal, 0 = LOAD, 1 = JOIN. */ + @VisibleForTesting static final String M_CURRENT_PHASE = "currentPhase"; + + /** Gauge: max joined records emitted for a probe-side record. */ + @VisibleForTesting static final String M_MAX_JOIN_FAN_OUT = "maxJoinFanOut"; + + /** Gauge: average joined records emitted per probe-side record. */ + @VisibleForTesting static final String M_AVG_JOIN_FAN_OUT = "avgJoinFanOut"; + + /** Counter: probe-side records without a match. */ + @VisibleForTesting static final String M_NUM_UNMATCHED_PROBE = "numUnmatchedProbeRecords"; + + /** Counter: build-side retractions for a row not present in state. */ + @VisibleForTesting + static final String M_NUM_UNMATCHED_BUILD_RETRACTIONS = "numUnmatchedBuildRetractions"; + + /** Two-phase state machine. */ + public enum Phase { + LOAD, + JOIN + } + + /** What triggered the {@link Phase#LOAD} to {@link Phase#JOIN} flip (for logging). */ + private enum FlipTrigger { + BUILD_WATERMARK, + IDLE_TIMEOUT + } + + // -------------------------- constructor args -------------------------- + + private final boolean isLeftOuterJoin; + private final InternalTypeInfo<RowData> leftType; + private final InternalTypeInfo<RowData> rightType; + + /** Field index of the build-side (right) row-time attribute. */ + private final int buildRowtimeIndex; + + private final GeneratedJoinCondition generatedJoinCondition; + + /** + * Per-equi-key flag indicating whether rows with a NULL in that key position must be filtered + * before the join condition runs (SQL semantics: {@code NULL = NULL} is not true). + */ + private final boolean[] filterNullKeys; + + /** + * Timestamp at which the build-side watermark must arrive for the operator to flip from {@code + * LOAD} to JOIN. + */ + private final Long loadCompletedTime; + + /** + * Processing-time idle timeout duration (millis) on build-side watermarks. When configured, the + * operator flips to JOIN if no build-side watermark advance is seen for this duration. + */ + @Nullable private final Long loadCompletedIdleTimeoutMs; + + /** + * State TTL (millis) to clean up any keyed state during JOIN phase. We schedule TTL timers + * maxStateTtlMs ahead and check on minStateTtlMs before scheduling a new timer. This avoids + * rescheduling timers on every state access while still ensuring that keyed state is evicted + * after at most maxStateTtlMs of key inactivity during JOIN phase. If minStateTtlMs is set to + * 0, state TTL is disabled. + */ + private final long minStateTtlMs; + + private final long maxStateTtlMs; + + // -------------------------- transient runtime fields -------------------------- + + private transient JoinConditionWithNullFilters joinCondition; + private transient GenericRowData nullPaddedBuild; + private transient TimestampedCollector<RowData> collector; + + private transient InternalTimerService<String> timerService; + + private transient Phase phase; + + /** + * Processing-time wall clock at which the operator transitioned from {@link Phase#LOAD} to + * {@link Phase#JOIN}. {@code null} while still in LOAD. Used by the TTL handler to reschedule + * state TTL timers that fire too early. + */ + @Nullable private transient Long flipProcTime; + + /** Highest build-side watermark observed. */ + private transient long currentBuildSideWm; + + /** Latest probe-side watermark observed. */ + private transient long currentProbeSideWm; + + /** Non-keyed processing-time idle-flip timer. */ + @Nullable private transient ScheduledFuture<?> idleFlipTimer; + + // -------------------------- keyed state -------------------------- + + /** Build-side table as multi-set: row → reference count. */ + private transient MapState<RowData, Long> buildTableState; + + /** Buffer for build-side changes during JOIN to ensure atomic updates. */ + private transient ListState<RowData> buildChangeBuffer; + + /** Build-side watermark to ensure atomic application of build changes during JOIN. */ + private transient ValueState<Long> bufferedAtWmState; + + /** Buffer for probe-side records during LOAD. */ + private transient ListState<RowData> probeBuffer; + + /** Most recently registered TTL timer deadline; used to advance TTL timer. */ + private transient ValueState<Long> ttlExpiryState; + + // -------------------------- operator state -------------------------- + + private transient ListState<String> operatorPhaseState; Review Comment: Can we use `ListState<Phase>`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
