gustavodemorais commented on code in PR #28329: URL: https://github.com/apache/flink/pull/28329#discussion_r3437217142
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperator.java: ########## @@ -0,0 +1,1038 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.snapshot; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.DefaultOpenContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.generated.JoinCondition; +import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters; +import org.apache.flink.table.runtime.operators.metrics.SimpleGauge; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +/** + * Stream operator implementing the {@code LATERAL SNAPSHOT} processing-time temporal table join. + * + * <p>The operator runs in two operator-wide phases that progress only forward, LOAD then JOIN: LOAD + * bootstraps the build-side table from its changelog up to {@code loadCompletedTime}, buffering + * probe rows meanwhile; JOIN then continuously joins probe rows against the materialized build-side + * state. How each input is handled depends on the current phase: + * + * <ul> + * <li>Build-side (input2 / right) changes are handled the same way in both phases: they are + * buffered in {@code buildChangeBuffer} and applied lazily to a per-key multi-set in {@code + * buildTableState} on the next per-key access (build- or probe-side) once the build-side + * watermark has advanced past the buffer's tag, or at the flip. The buffered changelog is + * applied in event-time order: changes are sorted by the build-side row-time attribute + * ({@code buildRowtimeIndex}), and for equal row-times retractions ({@code -U}/{@code -D}) + * are applied before accumulations ({@code +U}/{@code +I}). Buffering until the watermark + * passes preserves atomic update visibility across {@code -U}/{@code +U} pairs in JOIN phase. + * <li>Probe-side (input1 / left) records are handled differently per phase. During LOAD they are + * buffered in {@code probeBuffer} until the configured flip point is reached on the + * build-side watermark, at which point a per-key event-time timer drains the buffered probes + * and joins them with the materialized build-side state. During JOIN they are joined + * immediately with the current build-side state. + * </ul> + * + * <p>Watermark forwarding rules: + * + * <ul> + * <li>Build-side watermarks are never forwarded downstream. + * <li>Probe-side watermarks are held back during LOAD and forwarded during JOIN phase. + * </ul> + * + * <p>The flip from LOAD to JOIN phase is triggered by either: + * + * <ul> + * <li>the build-side watermark reaching {@code loadCompletedTime} (event-time gate), or + * <li>the {@code loadCompletedIdleTimeoutMs} processing-time timer firing without any build-side + * watermark advance. + * </ul> + * + * <p>State TTL eviction happens during JOIN phase and is implemented with keyed processing-time + * timers (matching the semantics of Flink's standard {@code StateTtlConfig}). + */ +@Internal +public class LateralSnapshotJoinOperator extends AbstractStreamOperator<RowData> + implements TwoInputStreamOperator<RowData, RowData, RowData>, Triggerable<RowData, String> { + + // -------------------------- static final definitions -------------------------- + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(LateralSnapshotJoinOperator.class); + + /** operator state names */ + private static final String OPERATOR_PHASE_STATE_NAME = "lateral-snapshot-phase"; + + /** keyed state names */ + @VisibleForTesting static final String BUILD_TABLE_STATE_NAME = "build-table"; + + @VisibleForTesting static final String BUILD_CHANGE_BUFFER_STATE_NAME = "build-change-buffer"; + @VisibleForTesting static final String BUFFERED_AT_WM_STATE_NAME = "buffered-at-wm"; + @VisibleForTesting static final String PROBE_BUFFER_STATE_NAME = "probe-buffer"; + private static final String TTL_EXPIRY_STATE_NAME = "ttl-expiry"; + + /** timer service and namespace names */ + private static final String TIMER_SERVICE_NAME = "lateral-snapshot-timers"; + + @VisibleForTesting static final String NS_FLIP = "flip"; + @VisibleForTesting static final String NS_TTL = "ttl"; + + /** + * Event-time timestamp at which the per-key {@code probeBuffer} flip join timer is registered. + * Any non-{@code MIN_VALUE} watermark advance fires it. + */ + @VisibleForTesting static final long FLIP_JOIN_TIMER_TS = 1L; + + /** Gauge: probe-side records currently buffered during LOAD. */ + @VisibleForTesting static final String M_NUM_PROBE_BUFFERED = "numProbeSideRecordsBuffered"; + + /** Gauge: build-side changes currently buffered during JOIN. */ + @VisibleForTesting static final String M_NUM_BUILD_BUFFERED = "numBuildSideRecordsBuffered"; + + /** Counter: keys evicted from state by TTL. */ + @VisibleForTesting static final String M_NUM_STATE_TTL_EVICTIONS = "numStateTtlEvictions"; + + /** Gauge: latest build-side watermark observed. */ + @VisibleForTesting static final String M_CURRENT_BUILD_WM = "currentBuildSideWatermark"; + + /** Gauge: latest probe-side watermark observed. */ + @VisibleForTesting static final String M_CURRENT_PROBE_WM = "currentProbeSideWatermark"; + + /** Gauge: current phase ordinal, 0 = LOAD, 1 = JOIN. */ + @VisibleForTesting static final String M_CURRENT_PHASE = "currentPhase"; + + /** Gauge: max joined records emitted for a probe-side record. */ + @VisibleForTesting static final String M_MAX_JOIN_FAN_OUT = "maxJoinFanOut"; + + /** Gauge: average joined records emitted per probe-side record. */ + @VisibleForTesting static final String M_AVG_JOIN_FAN_OUT = "avgJoinFanOut"; + + /** Counter: probe-side records without a match. */ + @VisibleForTesting static final String M_NUM_UNMATCHED_PROBE = "numUnmatchedProbeRecords"; + + /** Counter: build-side retractions for a row not present in state. */ + @VisibleForTesting + static final String M_NUM_UNMATCHED_BUILD_RETRACTIONS = "numUnmatchedBuildRetractions"; + + /** Two-phase state machine. */ + public enum Phase { Review Comment: Tiny thing: `Phase` is `public` but I don't think anything outside needs it (planner doesn't, tests share the package) - maybe make it package-private? Or do you already have plans for other users -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
