This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e48cc72 [AURON #1856] Introduce Flink expression-level converter 
framework (#2146)
9e48cc72 is described below

commit 9e48cc72a2eeede8e298df2f61db784f711283f0
Author: Weiqing Yang <[email protected]>
AuthorDate: Wed Apr 1 22:38:32 2026 -0700

    [AURON #1856] Introduce Flink expression-level converter framework (#2146)
    
    # Which issue does this PR close?
    
      Closes #1856
    
      # Rationale for this change
    
    Auron's Flink integration has the data exchange layer in place (Arrow
    Writer #1850, Arrow Reader #1851) but lacks the conversion
    infrastructure — the machinery that decides which Flink expressions and
    aggregates can be converted to native execution and how to translate
    them into Auron's protobuf representation.
    
    # What changes are included in this PR?
    
    Five foundational Java classes in `auron-flink-planner`:
    
      | Class | Role |
      |-------|------|
    | `FlinkNodeConverter<T>` | Generic base interface: `getNodeClass()`,
    `isSupported()`, `convert()` → `PhysicalExprNode` |
    | `FlinkRexNodeConverter` | Sub-interface for Calcite `RexNode`
    expressions |
    | `FlinkAggCallConverter` | Sub-interface for Calcite `AggregateCall`
    aggregates |
    | `FlinkNodeConverterFactory` | Singleton registry with separate
    `rexConverterMap` + `aggConverterMap`, typed registration and fail-safe
    dispatch |
    | `ConverterContext` | Immutable holder for input schema (`RowType`),
    Flink config, Auron config, and classloader |
    
    pom.xml: `flink-core` scope changed from `test` to `provided` (required
    for `ReadableConfig`).
    
    Framework-only — no concrete converter implementations. Follow-up issues
    will add RexLiteral, RexInputRef, RexCall, and aggregate converters.
    Design doc: `docs/PR-AURON-1856/AURON-1856-DESIGN.md`
    Review helper: `docs/reviewhelper/AURON-1856/01-converter-framework.md`
      # Are there any user-facing changes?
    
      No.
    
      # How was this patch tested?
    
    8 unit tests. Checkstyle: 0 violations.
---
 auron-flink-extension/auron-flink-planner/pom.xml  |   2 +-
 .../table/planner/converter/ConverterContext.java  |  80 +++++++++
 .../planner/converter/FlinkAggCallConverter.java   |  35 ++++
 .../planner/converter/FlinkNodeConverter.java      |  64 +++++++
 .../converter/FlinkNodeConverterFactory.java       | 170 ++++++++++++++++++
 .../planner/converter/FlinkRexNodeConverter.java   |  37 ++++
 .../converter/FlinkNodeConverterFactoryTest.java   | 199 +++++++++++++++++++++
 7 files changed, 586 insertions(+), 1 deletion(-)

diff --git a/auron-flink-extension/auron-flink-planner/pom.xml 
b/auron-flink-extension/auron-flink-planner/pom.xml
index 268418eb..bce3ae82 100644
--- a/auron-flink-extension/auron-flink-planner/pom.xml
+++ b/auron-flink-extension/auron-flink-planner/pom.xml
@@ -201,7 +201,7 @@
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-core</artifactId>
       <version>${flink.version}</version>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
diff --git 
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/ConverterContext.java
 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/ConverterContext.java
new file mode 100644
index 00000000..c19d1a5c
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/ConverterContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import java.util.Objects;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Provides shared state to {@link FlinkNodeConverter} implementations during 
conversion.
+ *
+ * <p>Carries the input schema, configuration, and classloader needed for 
type-aware conversion
+ * of Flink expressions and aggregate calls.
+ */
+public class ConverterContext {
+
+    private final ReadableConfig tableConfig;
+    private final AuronConfiguration auronConfiguration;
+    private final ClassLoader classLoader;
+    private final RowType inputType;
+
+    /**
+     * Creates a new converter context.
+     *
+     * @param tableConfig Flink table-level configuration
+     * @param auronConfiguration Auron-specific configuration, may be {@code 
null}
+     * @param classLoader classloader for the current Flink context
+     * @param inputType input schema of the node being converted
+     */
+    public ConverterContext(
+            ReadableConfig tableConfig,
+            AuronConfiguration auronConfiguration,
+            ClassLoader classLoader,
+            RowType inputType) {
+        this.tableConfig = Objects.requireNonNull(tableConfig, "tableConfig 
must not be null");
+        this.auronConfiguration = auronConfiguration;
+        this.classLoader = Objects.requireNonNull(classLoader, "classLoader 
must not be null");
+        this.inputType = Objects.requireNonNull(inputType, "inputType must not 
be null");
+    }
+
+    /** Returns the Flink table-level configuration. */
+    public ReadableConfig getTableConfig() {
+        return tableConfig;
+    }
+
+    /** Returns the Auron-specific configuration, or {@code null} if not 
provided. */
+    public AuronConfiguration getAuronConfiguration() {
+        return auronConfiguration;
+    }
+
+    /** Returns the classloader for the current Flink context. */
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
+    /**
+     * Returns the input schema of the node being converted.
+     *
+     * <p>Converters use this to resolve {@code RexInputRef} column references 
to concrete types,
+     * check type support, and determine if casts are needed.
+     */
+    public RowType getInputType() {
+        return inputType;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkAggCallConverter.java
 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkAggCallConverter.java
new file mode 100644
index 00000000..7ae947ac
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkAggCallConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import org.apache.calcite.rel.core.AggregateCall;
+
+/**
+ * Converts a Calcite {@link AggregateCall} to an Auron native
+ * {@link org.apache.auron.protobuf.PhysicalExprNode} (wrapping a
+ * {@code PhysicalAggExprNode}).
+ *
+ * <p>An {@code AggregateCall} represents an aggregate function invocation 
(e.g., SUM, COUNT, MAX)
+ * with its argument references, return type, and distinctness. The converter 
translates this into
+ * a {@code PhysicalAggExprNode} containing the aggregate function type, 
converted child
+ * expressions, and return type.
+ *
+ * <p>Note: {@code AggregateCall} internally references input columns by 
index. The converter uses
+ * {@link ConverterContext#getInputType()} to resolve these indices to 
concrete types for type
+ * checking and cast insertion.
+ */
+public interface FlinkAggCallConverter extends 
FlinkNodeConverter<AggregateCall> {}
diff --git 
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverter.java
 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverter.java
new file mode 100644
index 00000000..8e6c76af
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import org.apache.auron.protobuf.PhysicalExprNode;
+
+/**
+ * Base interface for converting Flink plan elements to Auron native {@link 
PhysicalExprNode}
+ * representations.
+ *
+ * <p>This interface is parameterized by the input type to support different 
categories of plan
+ * elements. Two sub-interfaces are provided:
+ * <ul>
+ *   <li>{@link FlinkRexNodeConverter} for Calcite {@code RexNode} expressions
+ *   <li>{@link FlinkAggCallConverter} for Calcite {@code AggregateCall} 
aggregates
+ * </ul>
+ *
+ * @param <T> the type of plan element this converter handles
+ */
+public interface FlinkNodeConverter<T> {
+
+    /**
+     * Returns the concrete class this converter handles.
+     *
+     * <p>Used by {@link FlinkNodeConverterFactory} for lookup dispatch.
+     */
+    Class<? extends T> getNodeClass();
+
+    /**
+     * Checks whether the given element can be converted to native execution.
+     *
+     * <p>A converter may decline based on unsupported types, operand 
combinations, or
+     * configuration. This method must not have side effects.
+     *
+     * @param node the plan element to check
+     * @param context shared conversion state (input schema, configuration)
+     * @return {@code true} if the element can be converted
+     */
+    boolean isSupported(T node, ConverterContext context);
+
+    /**
+     * Converts the given element to a native {@link PhysicalExprNode}.
+     *
+     * @param node the plan element to convert
+     * @param context shared conversion state (input schema, configuration)
+     * @return the native expression representation
+     * @throws IllegalArgumentException if the element type does not match 
{@link #getNodeClass()}
+     */
+    PhysicalExprNode convert(T node, ConverterContext context);
+}
diff --git 
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java
 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java
new file mode 100644
index 00000000..a8610135
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rex.RexNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Singleton registry of {@link FlinkNodeConverter} instances. Dispatches 
conversion requests to
+ * the appropriate converter based on the input element's class.
+ *
+ * <p>Maintains separate registries for different converter categories:
+ * <ul>
+ *   <li>{@code rexConverterMap} for {@link FlinkRexNodeConverter} instances
+ *       (keyed by {@code RexNode} subclass)
+ *   <li>{@code aggConverterMap} for {@link FlinkAggCallConverter} instances
+ *       (keyed by {@code AggregateCall} class)
+ * </ul>
+ *
+ * <p>Usage:
+ * <pre>
+ *   FlinkNodeConverterFactory factory = 
FlinkNodeConverterFactory.getInstance();
+ *   // Convert a RexNode expression
+ *   Optional&lt;PhysicalExprNode&gt; result = factory.convertRexNode(rexNode, 
context);
+ *   // Convert an AggregateCall
+ *   Optional&lt;PhysicalExprNode&gt; aggResult = 
factory.convertAggCall(aggCall, context);
+ * </pre>
+ */
+public class FlinkNodeConverterFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkNodeConverterFactory.class);
+
+    private static final FlinkNodeConverterFactory INSTANCE = new 
FlinkNodeConverterFactory();
+
+    private final Map<Class<? extends RexNode>, FlinkRexNodeConverter> 
rexConverterMap;
+    private final Map<Class<? extends AggregateCall>, FlinkAggCallConverter> 
aggConverterMap;
+
+    // Package-private for test isolation (tests create fresh instances)
+    FlinkNodeConverterFactory() {
+        this.rexConverterMap = new HashMap<>();
+        this.aggConverterMap = new HashMap<>();
+    }
+
+    /** Returns the singleton instance. */
+    public static FlinkNodeConverterFactory getInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * Registers a {@link FlinkRexNodeConverter} for its declared {@code 
RexNode} subclass.
+     *
+     * @param converter the converter to register
+     * @throws IllegalArgumentException if a converter is already registered 
for the same class
+     */
+    public void registerRexConverter(FlinkRexNodeConverter converter) {
+        Class<? extends RexNode> nodeClass = converter.getNodeClass();
+        if (rexConverterMap.containsKey(nodeClass)) {
+            throw new IllegalArgumentException("Duplicate RexNode converter 
for " + nodeClass.getName());
+        }
+        rexConverterMap.put(nodeClass, converter);
+    }
+
+    /**
+     * Registers a {@link FlinkAggCallConverter} for its declared {@code 
AggregateCall} class.
+     *
+     * @param converter the converter to register
+     * @throws IllegalArgumentException if a converter is already registered 
for the same class
+     */
+    public void registerAggConverter(FlinkAggCallConverter converter) {
+        Class<? extends AggregateCall> nodeClass = converter.getNodeClass();
+        if (aggConverterMap.containsKey(nodeClass)) {
+            throw new IllegalArgumentException("Duplicate AggregateCall 
converter for " + nodeClass.getName());
+        }
+        aggConverterMap.put(nodeClass, converter);
+    }
+
+    /**
+     * Attempts to convert the given {@link RexNode} to a native {@link 
PhysicalExprNode}.
+     *
+     * <p>Returns the native expression if a matching converter exists and 
supports the node.
+     * Returns empty if no converter is registered, the converter does not 
support the node,
+     * or conversion fails (fail-safe).
+     *
+     * @param node the RexNode to convert
+     * @param context shared conversion state
+     * @return the converted expression, or empty
+     */
+    public Optional<PhysicalExprNode> convertRexNode(RexNode node, 
ConverterContext context) {
+        FlinkRexNodeConverter converter = rexConverterMap.get(node.getClass());
+        if (converter == null) {
+            return Optional.empty();
+        }
+        if (!converter.isSupported(node, context)) {
+            return Optional.empty();
+        }
+        try {
+            return Optional.of(converter.convert(node, context));
+        } catch (Exception e) {
+            LOG.warn("RexNode conversion failed for {}", 
node.getClass().getName(), e);
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Attempts to convert the given {@link AggregateCall} to a native {@link 
PhysicalExprNode}.
+     *
+     * <p>Returns the native expression if a matching converter exists and 
supports the call.
+     * Returns empty if no converter is registered, the converter does not 
support the call,
+     * or conversion fails (fail-safe).
+     *
+     * @param aggCall the AggregateCall to convert
+     * @param context shared conversion state
+     * @return the converted expression, or empty
+     */
+    public Optional<PhysicalExprNode> convertAggCall(AggregateCall aggCall, 
ConverterContext context) {
+        FlinkAggCallConverter converter = 
aggConverterMap.get(aggCall.getClass());
+        if (converter == null) {
+            return Optional.empty();
+        }
+        if (!converter.isSupported(aggCall, context)) {
+            return Optional.empty();
+        }
+        try {
+            return Optional.of(converter.convert(aggCall, context));
+        } catch (Exception e) {
+            LOG.warn(
+                    "AggregateCall conversion failed for {}", 
aggCall.getClass().getName(), e);
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Returns the converter registered for the given element's class, if any.
+     *
+     * <p>Dispatches by type hierarchy: checks {@code RexNode} converters 
first, then
+     * {@code AggregateCall} converters.
+     *
+     * @param nodeClass the class to look up
+     * @return the matching converter, or empty
+     */
+    @SuppressWarnings("unchecked")
+    public Optional<FlinkNodeConverter<?>> getConverter(Class<?> nodeClass) {
+        if (RexNode.class.isAssignableFrom(nodeClass)) {
+            return Optional.ofNullable(rexConverterMap.get((Class<? extends 
RexNode>) nodeClass));
+        } else if (AggregateCall.class.isAssignableFrom(nodeClass)) {
+            return Optional.ofNullable(aggConverterMap.get((Class<? extends 
AggregateCall>) nodeClass));
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkRexNodeConverter.java
 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkRexNodeConverter.java
new file mode 100644
index 00000000..9778ae01
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkRexNodeConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Converts a Calcite {@link RexNode} expression to an Auron native
+ * {@link org.apache.auron.protobuf.PhysicalExprNode}.
+ *
+ * <p>Implementations handle specific {@code RexNode} subtypes:
+ * <ul>
+ *   <li>{@code RexLiteral} — scalar literal values
+ *   <li>{@code RexInputRef} — column references (resolved via
+ *       {@link ConverterContext#getInputType()})
+ *   <li>{@code RexCall} — function/operator calls (arithmetic, comparison, 
CAST, etc.)
+ *   <li>{@code RexFieldAccess} — nested field access
+ * </ul>
+ *
+ * <p>RexNode converters are reusable across operator types — the same {@code 
RexInputRef}
+ * converter works for Calc projections, Agg grouping expressions, and future 
operators.
+ */
+public interface FlinkRexNodeConverter extends FlinkNodeConverter<RexNode> {}
diff --git 
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
new file mode 100644
index 00000000..ac645174
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Optional;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link FlinkNodeConverterFactory}. */
+class FlinkNodeConverterFactoryTest {
+
+    private static final RelDataTypeFactory TYPE_FACTORY = new 
JavaTypeFactoryImpl();
+    private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+    private FlinkNodeConverterFactory factory;
+    private ConverterContext context;
+    private RexLiteral testLiteral;
+    private AggregateCall testAggCall;
+
+    @BeforeEach
+    void setUp() {
+        factory = new FlinkNodeConverterFactory();
+        context =
+                new ConverterContext(new Configuration(), null, 
getClass().getClassLoader(), RowType.of(new IntType()));
+        testLiteral = 
REX_BUILDER.makeExactLiteral(java.math.BigDecimal.valueOf(42));
+        testAggCall = AggregateCall.create(
+                SqlStdOperatorTable.COUNT,
+                false,
+                false,
+                false,
+                java.util.Collections.emptyList(),
+                -1,
+                null,
+                org.apache.calcite.rel.RelCollations.EMPTY,
+                TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT),
+                "cnt");
+    }
+
+    @Test
+    void testRexConverterDispatch() {
+        PhysicalExprNode expected = PhysicalExprNode.newBuilder().build();
+        factory.registerRexConverter(new StubRexNodeConverter(true, expected));
+
+        Optional<PhysicalExprNode> result = 
factory.convertRexNode(testLiteral, context);
+        assertTrue(result.isPresent());
+        assertEquals(expected, result.get());
+    }
+
+    @Test
+    void testAggConverterDispatch() {
+        PhysicalExprNode expected = PhysicalExprNode.newBuilder().build();
+        factory.registerAggConverter(new StubAggCallConverter(true, expected));
+
+        Optional<PhysicalExprNode> result = 
factory.convertAggCall(testAggCall, context);
+        assertTrue(result.isPresent());
+        assertEquals(expected, result.get());
+    }
+
+    @Test
+    void testUnsupportedRexPassthrough() {
+        factory.registerRexConverter(new StubRexNodeConverter(false, null));
+
+        Optional<PhysicalExprNode> result = 
factory.convertRexNode(testLiteral, context);
+        assertFalse(result.isPresent());
+    }
+
+    @Test
+    void testConversionFailureFallback() {
+        factory.registerRexConverter(new StubRexNodeConverter(true, null) {
+            @Override
+            public PhysicalExprNode convert(RexNode node, ConverterContext 
context) {
+                throw new RuntimeException("conversion error");
+            }
+        });
+
+        Optional<PhysicalExprNode> result = 
factory.convertRexNode(testLiteral, context);
+        assertFalse(result.isPresent());
+    }
+
+    @Test
+    void testDuplicateRexConverterRejected() {
+        factory.registerRexConverter(new StubRexNodeConverter(true, null));
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> factory.registerRexConverter(new 
StubRexNodeConverter(true, null)));
+    }
+
+    @Test
+    void testGetConverterByClass() {
+        StubRexNodeConverter converter = new StubRexNodeConverter(true, null);
+        factory.registerRexConverter(converter);
+
+        Optional<FlinkNodeConverter<?>> found = 
factory.getConverter(RexLiteral.class);
+        assertTrue(found.isPresent());
+        assertEquals(converter, found.get());
+    }
+
+    @Test
+    void testGetConverterByClassAgg() {
+        StubAggCallConverter converter = new StubAggCallConverter(true, null);
+        factory.registerAggConverter(converter);
+
+        Optional<FlinkNodeConverter<?>> found = 
factory.getConverter(AggregateCall.class);
+        assertTrue(found.isPresent());
+        assertEquals(converter, found.get());
+    }
+
+    @Test
+    void testGetConverterAbsent() {
+        Optional<FlinkNodeConverter<?>> found = 
factory.getConverter(RexLiteral.class);
+        assertFalse(found.isPresent());
+    }
+
+    // ---- Test stubs ----
+
+    /** Stub FlinkRexNodeConverter for testing. */
+    private static class StubRexNodeConverter implements FlinkRexNodeConverter 
{
+        private final boolean supported;
+        private final PhysicalExprNode result;
+
+        StubRexNodeConverter(boolean supported, PhysicalExprNode result) {
+            this.supported = supported;
+            this.result = result;
+        }
+
+        @Override
+        public Class<? extends RexNode> getNodeClass() {
+            return RexLiteral.class;
+        }
+
+        @Override
+        public boolean isSupported(RexNode node, ConverterContext context) {
+            return supported;
+        }
+
+        @Override
+        public PhysicalExprNode convert(RexNode node, ConverterContext 
context) {
+            return result;
+        }
+    }
+
+    /** Stub FlinkAggCallConverter for testing. */
+    private static class StubAggCallConverter implements FlinkAggCallConverter 
{
+        private final boolean supported;
+        private final PhysicalExprNode result;
+
+        StubAggCallConverter(boolean supported, PhysicalExprNode result) {
+            this.supported = supported;
+            this.result = result;
+        }
+
+        @Override
+        public Class<? extends AggregateCall> getNodeClass() {
+            return AggregateCall.class;
+        }
+
+        @Override
+        public boolean isSupported(AggregateCall node, ConverterContext 
context) {
+            return supported;
+        }
+
+        @Override
+        public PhysicalExprNode convert(AggregateCall node, ConverterContext 
context) {
+            return result;
+        }
+    }
+}

Reply via email to