lincoln-lil commented on code in PR #27754:
URL: https://github.com/apache/flink/pull/27754#discussion_r2938823446


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java:
##########
@@ -305,244 +497,106 @@ protected Transformation<RowData> 
translateToPlanInternal(
         return transform;
     }
 
-    private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
-            PlannerBase planner,
-            ExecNodeConfig config,
-            RelOptTable leftTempTable,
-            RelOptTable rightTempTable,
-            Map<Integer, FunctionParam> leftLookupKeys,
-            Map<Integer, FunctionParam> rightLookupKeys,
-            RelBuilder relBuilder,
-            RowType leftStreamType,
-            RowType rightStreamType,
-            RowDataKeySelector leftJoinKeySelector,
-            RowDataKeySelector leftUpsertKeySelector,
-            RowDataKeySelector rightJoinKeySelector,
-            RowDataKeySelector rightUpsertKeySelector,
-            ClassLoader classLoader) {
-
-        DataTypeFactory dataTypeFactory =
-                
ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
-
-        AsyncDeltaJoinRunner leftLookupTableAsyncFunction =
-                createAsyncDeltaJoinRunner(
-                        planner,
-                        config,
-                        classLoader,
-                        dataTypeFactory,
-                        leftTempTable,
-                        rightTempTable,
-                        leftStreamType,
-                        rightStreamType,
-                        leftLookupKeys,
-                        leftJoinKeySelector,
-                        leftUpsertKeySelector,
-                        rightJoinKeySelector,
-                        rightUpsertKeySelector,
-                        false);
-
-        AsyncDeltaJoinRunner rightLookupTableAsyncFunction =
-                createAsyncDeltaJoinRunner(
-                        planner,
-                        config,
-                        classLoader,
-                        dataTypeFactory,
-                        leftTempTable,
-                        rightTempTable,
-                        leftStreamType,
-                        rightStreamType,
-                        rightLookupKeys,
-                        leftJoinKeySelector,
-                        leftUpsertKeySelector,
-                        rightJoinKeySelector,
-                        rightUpsertKeySelector,
-                        true);
-
-        Tuple2<Long, Long> leftRightCacheSize = getCacheSize(config);
-
-        return new StreamingDeltaJoinOperatorFactory(
-                rightLookupTableAsyncFunction,
-                leftLookupTableAsyncFunction,
-                leftJoinKeySelector,
-                rightJoinKeySelector,
-                asyncLookupOptions.asyncTimeout,
-                asyncLookupOptions.asyncBufferCapacity,
-                leftRightCacheSize.f0,
-                leftRightCacheSize.f1,
-                leftStreamType,
-                rightStreamType);
-    }
-
-    @SuppressWarnings("unchecked")
-    private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
+    private static LookupHandlerBase generateLookupHandler(
+            boolean isBinaryLookup,
+            DeltaJoinLookupChain.Node node,
+            Map<Integer, GeneratedFunction<AsyncFunction<RowData, Object>>>
+                    generatedFetcherCollector,
+            DeltaJoinTree deltaJoinTree,
             PlannerBase planner,
-            ExecNodeConfig config,
+            FlinkTypeFactory typeFactory,
             ClassLoader classLoader,
-            DataTypeFactory dataTypeFactory,
-            RelOptTable leftTempTable,
-            RelOptTable rightTempTable,
-            RowType leftStreamSideType,
-            RowType rightStreamSideType,
-            Map<Integer, FunctionParam> lookupKeys,
-            RowDataKeySelector leftJoinKeySelector,
-            RowDataKeySelector leftUpsertKeySelector,
-            RowDataKeySelector rightJoinKeySelector,
-            RowDataKeySelector rightUpsertKeySelector,
-            boolean treatRightAsLookupTable) {
-        RelOptTable lookupTable = treatRightAsLookupTable ? rightTempTable : 
leftTempTable;
-        RowType streamSideType = treatRightAsLookupTable ? leftStreamSideType 
: rightStreamSideType;
-        RowType lookupSideType = treatRightAsLookupTable ? rightStreamSideType 
: leftStreamSideType;
-
+            ExecNodeConfig config) {
+        final int[] sourceInputOrdinals = node.inputTableBinaryInputOrdinals;
+        final int lookupTableOrdinal = node.lookupTableBinaryInputOrdinal;
+        final RowType sourceStreamType =
+                deltaJoinTree.getOutputRowTypeOnNode(sourceInputOrdinals, 
typeFactory);
+
+        final TableSourceTable lookupTable =
+                (TableSourceTable)
+                        node.deltaJoinSpec
+                                .getLookupTable()
+                                .getTemporalTable(planner.getFlinkContext(), 
typeFactory);
+
+        final Map<Integer, LookupJoinUtil.FunctionParam> lookupKeyMap =
+                node.deltaJoinSpec.getLookupKeyMap();
         AsyncTableFunction<?> lookupSideAsyncTableFunction =
-                getUnwrappedAsyncLookupFunction(lookupTable, 
lookupKeys.keySet(), classLoader);
+                getUnwrappedAsyncLookupFunction(lookupTable, 
lookupKeyMap.keySet(), classLoader);
         UserDefinedFunctionHelper.prepareInstance(config, 
lookupSideAsyncTableFunction);
 
-        RowType lookupTableSourceRowType =
+        final RowType lookupTableSourceRowType =
                 FlinkTypeFactory.toLogicalRowType(lookupTable.getRowType());
 
-        RowType resultRowType = (RowType) getOutputType();
+        final RowType lookupResultRowType =
+                combineOutputRowType(
+                        sourceStreamType, lookupTableSourceRowType, 
node.joinType, typeFactory);
 
-        List<FunctionCallUtil.FunctionParam> convertedKeys =
-                
Arrays.stream(LookupJoinUtil.getOrderedLookupKeys(lookupKeys.keySet()))
-                        .mapToObj(lookupKeys::get)
+        List<LookupJoinUtil.FunctionParam> lookupKeysOnInputSide =
+                
Arrays.stream(LookupJoinUtil.getOrderedLookupKeys(lookupKeyMap.keySet()))
+                        .mapToObj(lookupKeyMap::get)
                         .collect(Collectors.toList());
 
         
FunctionCallCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData,
 Object>>
-                lookupSideGeneratedFuncWithType =
+                lookupSideGeneratedFetcherWithType =
                         LookupJoinCodeGenerator.generateAsyncLookupFunction(
                                 config,
                                 classLoader,
-                                dataTypeFactory,
-                                streamSideType,
+                                
unwrapDataTypeFactory(planner.createRelBuilder()),
+                                sourceStreamType,
                                 lookupTableSourceRowType,
-                                resultRowType,
-                                convertedKeys,
+                                lookupResultRowType,
+                                lookupKeysOnInputSide,
                                 lookupSideAsyncTableFunction,
                                 String.join(".", 
lookupTable.getQualifiedName()));
 
-        DataStructureConverter<?, ?> lookupSideFetcherConverter =
-                
DataStructureConverters.getConverter(lookupSideGeneratedFuncWithType.dataType());
-
-        GeneratedResultFuture<TableFunctionResultFuture<RowData>> 
lookupSideGeneratedResultFuture;
-        if (treatRightAsLookupTable) {
-            lookupSideGeneratedResultFuture =
-                    LookupJoinCodeGenerator.generateTableAsyncCollector(
-                            config,
-                            classLoader,
-                            "TableFunctionResultFuture",
-                            streamSideType,
-                            lookupTableSourceRowType,
-                            JavaScalaConversionUtil.toScala(
-                                    
lookupRightTableJoinSpec.getRemainingCondition()));
-        } else {
-            RexBuilder rexBuilder = new RexBuilder(planner.getTypeFactory());
-
-            Optional<RexNode> newCond =
-                    lookupLeftTableJoinSpec
-                            .getRemainingCondition()
-                            .map(
-                                    con ->
-                                            swapInputRefsInCondition(
-                                                    rexBuilder,
-                                                    con,
-                                                    leftStreamSideType,
-                                                    rightStreamSideType));
-            lookupSideGeneratedResultFuture =
-                    LookupJoinCodeGenerator.generateTableAsyncCollector(
-                            config,
-                            classLoader,
-                            "TableFunctionResultFuture",
-                            streamSideType,
-                            lookupTableSourceRowType,
-                            JavaScalaConversionUtil.toScala(newCond));
-        }
+        final RowType lookupSidePassThroughCalcRowType =
+                deltaJoinTree.getOutputRowTypeOnNode(new int[] 
{lookupTableOrdinal}, typeFactory);
 
         GeneratedFunction<FlatMapFunction<RowData, RowData>> 
lookupSideGeneratedCalc = null;
-        if ((treatRightAsLookupTable
-                        && 
lookupRightTableJoinSpec.getProjectionOnTemporalTable().isPresent())
-                || (!treatRightAsLookupTable
-                        && 
lookupLeftTableJoinSpec.getProjectionOnTemporalTable().isPresent())) {
-            // a projection or filter after lookup table
+        if (node.deltaJoinSpec.getProjectionOnTemporalTable().isPresent()) {
+            // a projection or filter after table source scan
             List<RexNode> projectionOnTemporalTable =
-                    treatRightAsLookupTable
-                            ? 
lookupRightTableJoinSpec.getProjectionOnTemporalTable().get()
-                            : 
lookupLeftTableJoinSpec.getProjectionOnTemporalTable().get();
+                    node.deltaJoinSpec.getProjectionOnTemporalTable().get();
             RexNode filterOnTemporalTable =
-                    treatRightAsLookupTable
-                            ? 
lookupRightTableJoinSpec.getFilterOnTemporalTable().orElse(null)
-                            : 
lookupLeftTableJoinSpec.getFilterOnTemporalTable().orElse(null);
+                    node.deltaJoinSpec.getFilterOnTemporalTable().orElse(null);
             lookupSideGeneratedCalc =
                     LookupJoinCodeGenerator.generateCalcMapFunction(
                             config,
-                            planner.getFlinkContext().getClassLoader(),
+                            classLoader,
                             
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
                             filterOnTemporalTable,
-                            lookupSideType,
+                            lookupSidePassThroughCalcRowType,
                             lookupTableSourceRowType);
         }
 
-        return new AsyncDeltaJoinRunner(
-                lookupSideGeneratedFuncWithType.tableFunc(),
-                (DataStructureConverter<RowData, Object>) 
lookupSideFetcherConverter,
-                lookupSideGeneratedCalc,
-                lookupSideGeneratedResultFuture,
-                InternalSerializers.create(lookupSideType),
-                leftJoinKeySelector,
-                leftUpsertKeySelector,
-                rightJoinKeySelector,
-                rightUpsertKeySelector,
-                asyncLookupOptions.asyncBufferCapacity,
-                treatRightAsLookupTable,
-                enableCache(config));
-    }
+        
Preconditions.checkState(!generatedFetcherCollector.containsKey(lookupTableOrdinal));
+        generatedFetcherCollector.put(
+                lookupTableOrdinal, 
lookupSideGeneratedFetcherWithType.tableFunc());
+
+        if (isBinaryLookup) {
+            return new BinaryLookupHandler(
+                    TypeConversions.fromLogicalToDataType(sourceStreamType),
+                    lookupSideGeneratedFetcherWithType.dataType(),
+                    
TypeConversions.fromLogicalToDataType(lookupSidePassThroughCalcRowType),
+                    
InternalSerializers.create(lookupSidePassThroughCalcRowType),
+                    lookupSideGeneratedCalc,
+                    node.inputTableBinaryInputOrdinals,
+                    node.lookupTableBinaryInputOrdinal);
+        }
 
-    /**
-     * When swapping the left and right row type, all input references in the 
condition should be
-     * shifted accordingly. Input references that originally pointed to the 
left will now point to
-     * the right, and those that originally pointed to the right will point to 
the left.
-     *
-     * <p>For example, origin left type: [int, double]; origin right type: 
[double, int]; origin
-     * condition: [$1 = $2]. After this shifting, the condition will be [$0 = 
$3].
-     *
-     * <p>Mainly inspired by {@link RelOptUtil.RexInputConverter}.
-     */
-    private RexNode swapInputRefsInCondition(
-            RexBuilder rexBuilder, RexNode condition, RowType leftType, 
RowType rightType) {
-        int leftFieldCount = leftType.getFieldCount();
-        int rightFieldCount = rightType.getFieldCount();
-        int[] adjustments = new int[leftFieldCount + rightFieldCount];
-        // all input references on the left will be shifted to the right by 
`rightFieldCount`
-        Arrays.fill(adjustments, 0, leftFieldCount, rightFieldCount);
-        // all input references on the right will be shifted to the left by 
`leftFieldCount`
-        Arrays.fill(
-                adjustments, leftFieldCount, leftFieldCount + rightFieldCount, 
leftFieldCount * -1);
-
-        RexShuttle converter =
-                new RexShuttle() {
-
-                    @Override
-                    public RexNode visitInputRef(RexInputRef inputRef) {
-                        int srcIndex = inputRef.getIndex();
-                        int destIndex = srcIndex + adjustments[srcIndex];
-                        RelDataType type = inputRef.getType();
-
-                        return rexBuilder.makeInputRef(type, destIndex);
-                    }
-                };
-
-        return condition.accept(converter);
+        throw new IllegalStateException("Support later");

Review Comment:
   Is this a temporary todo for followup pr?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/BinaryLookupHandler.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.deltajoin;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * A lookup handler implement to do a binary lookup on a dim table.
+ *
+ * <p>Different with {@code CascadedLookupHandler}, the {@link 
BinaryLookupHandler} only looks up a
+ * single table and do not be chained with other {@link 
DeltaJoinHandlerBase}s. Therefore, it allows
+ * more optimizations compared to the {@code CascadedLookupHandler}. 
Specifically:
+ *
+ * <ol>
+ *   <li>There only be a single input row.
+ *   <li>There is no need to deduplicate results using upsert keys.
+ *   <li>There is no need to filter the result with remaining conditions, and 
the lookup results
+ *       will be directly returned.
+ * </ol>
+ */
+public class BinaryLookupHandler extends LookupHandlerBase {
+
+    private static final long serialVersionUID = 1L;
+
+    public BinaryLookupHandler(
+            DataType streamSideType,
+            DataType lookupResultType,
+            DataType lookupSidePassThroughCalcType,
+            RowDataSerializer lookupSidePassThroughCalcRowSerializer,
+            @Nullable GeneratedFunction<FlatMapFunction<RowData, RowData>> 
lookupSideGeneratedCalc,
+            int[] ownedSourceOrdinals,
+            int ownedLookupOrdinal) {
+        super(
+                streamSideType,
+                lookupResultType,
+                lookupSidePassThroughCalcType,
+                lookupSidePassThroughCalcRowSerializer,
+                lookupSideGeneratedCalc,
+                ownedSourceOrdinals,
+                ownedLookupOrdinal,
+                "BinaryLookupHandler");
+    }
+
+    @Override
+    public void setNext(@Nullable DeltaJoinHandlerBase next) {
+        super.setNext(next);
+
+        Preconditions.checkArgument(
+                next == null, "This binary handler should not have a concrete 
handler after it");
+    }
+
+    @Override
+    public void asyncHandle() throws Exception {
+        Collection<RowData> allSourceRowData =
+                
handlerContext.getSharedMultiInputRowDataBuffer().getData(ownedSourceOrdinals);
+
+        Preconditions.checkState(allSourceRowData.size() == 1);
+
+        RowData input = allSourceRowData.stream().findFirst().get();

Review Comment:
   We can use `allSourceRowData.iterator().next()` here to reduce cost?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to