This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 03f4f797953 [SQL] Support positional parameters (#38880)
03f4f797953 is described below

commit 03f4f797953c676417753a92eadd7191054bc0d8
Author: Yi Hu <[email protected]>
AuthorDate: Thu Jun 11 15:58:37 2026 -0400

    [SQL] Support positional parameters (#38880)
---
 .../extensions/sql/impl/CalciteQueryPlanner.java   | 93 +++++++++++++++++++---
 .../{BeamSqlAliasTest => BeamSqlAliasTest.java}    | 12 +--
 .../extensions/sql/BeamSqlDslParametersTest.java   | 78 ++++++++++++++++++
 3 files changed, 168 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index 606a3c5f71a..aa6f4d12187 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -51,6 +51,11 @@ import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.Me
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataProvider;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexBuilder;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexDynamicParam;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexNode;
+import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rex.RexShuttle;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
 import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
 import 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperatorTable;
@@ -180,8 +185,8 @@ public class CalciteQueryPlanner implements QueryPlanner {
   public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters 
queryParameters)
       throws ParseException, SqlConversionException {
     Preconditions.checkArgument(
-        queryParameters.getKind() == Kind.NONE,
-        "Beam SQL Calcite dialect does not yet support query parameters.");
+        queryParameters.getKind() == Kind.NONE || queryParameters.getKind() == 
Kind.POSITIONAL,
+        "Beam SQL Calcite dialect only supports positional query parameters.");
     BeamRelNode beamRelNode;
     try {
       SqlNode parsed = planner.parse(sqlStatement);
@@ -191,28 +196,35 @@ public class CalciteQueryPlanner implements QueryPlanner {
 
       // root of original logical plan
       RelRoot root = planner.rel(validated);
+      RelNode relNode = root.rel;
+      if (queryParameters.getKind() == Kind.POSITIONAL) {
+        relNode =
+            bindParameters(
+                relNode,
+                new ParameterBinder(root.rel.getCluster().getRexBuilder(), 
queryParameters));
+      }
       LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
       RelTraitSet desiredTraits =
-          root.rel
+          relNode
               .getTraitSet()
               .replace(BeamLogicalConvention.INSTANCE)
               .replace(root.collation)
               .simplify();
       // beam physical plan
-      root.rel
+      relNode
           .getCluster()
           .setMetadataProvider(
               ChainedRelMetadataProvider.of(
                   ImmutableList.of(
                       NonCumulativeCostImpl.SOURCE,
                       RelMdNodeStats.SOURCE,
-                      root.rel.getCluster().getMetadataProvider())));
+                      relNode.getCluster().getMetadataProvider())));
 
-      
root.rel.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance);
+      
relNode.getCluster().setMetadataQuerySupplier(BeamRelMetadataQuery::instance);
       RelMetadataQuery.THREAD_PROVIDERS.set(
-          
JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
-      root.rel.getCluster().invalidateMetadataQuery();
-      beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, 
root.rel);
+          
JaninoRelMetadataProvider.of(relNode.getCluster().getMetadataProvider()));
+      relNode.getCluster().invalidateMetadataQuery();
+      beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, relNode);
       LOG.info("BEAMPlan>\n{}", BeamSqlRelUtils.explainLazily(beamRelNode));
     } catch (RelConversionException | CannotPlanException e) {
       throw new SqlConversionException(
@@ -225,6 +237,15 @@ public class CalciteQueryPlanner implements QueryPlanner {
     return beamRelNode;
   }
 
+  private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+    RelNode newRel = rel.accept(binder);
+    java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
+    for (RelNode input : newRel.getInputs()) {
+      newInputs.add(bindParameters(input, binder));
+    }
+    return newRel.copy(newRel.getTraitSet(), newInputs);
+  }
+
   // It needs to be public so that the generated code in Calcite can access it.
   public static class NonCumulativeCostImpl
       implements MetadataHandler<BuiltInMetadata.NonCumulativeCost> {
@@ -265,4 +286,58 @@ public class CalciteQueryPlanner implements QueryPlanner {
       return ((BeamRelNode) 
rel).beamComputeSelfCost(rel.getCluster().getPlanner(), bmq);
     }
   }
+
+  private static class ParameterBinder extends RexShuttle {
+    private final RexBuilder rexBuilder;
+    private final List<?> positionalParams;
+
+    ParameterBinder(RexBuilder rexBuilder, QueryParameters params) {
+      this.rexBuilder = rexBuilder;
+      this.positionalParams = params.getKind() == Kind.POSITIONAL ? 
params.positional() : null;
+    }
+
+    @Override
+    public RexNode visitDynamicParam(RexDynamicParam dynamicParam) {
+      if (positionalParams != null) {
+        int index = dynamicParam.getIndex();
+        if (index < 0 || index >= positionalParams.size()) {
+          throw new IllegalArgumentException(
+              "Index out of bounds for positional parameter: " + index);
+        }
+        Object val = positionalParams.get(index);
+        return makeLiteral(cleanValue(val), dynamicParam.getType());
+      }
+      return super.visitDynamicParam(dynamicParam);
+    }
+
+    private RexNode makeLiteral(Object val, RelDataType type) {
+      if (val == null) {
+        return rexBuilder.makeNullLiteral(type);
+      }
+      return rexBuilder.makeLiteral(val, type, true);
+    }
+
+    @SuppressWarnings("JavaUtilDate") // explicit java.util.Date support
+    private Object cleanValue(Object value) {
+      if (value instanceof org.joda.time.ReadableInstant) {
+        return ((org.joda.time.ReadableInstant) value).getMillis();
+      }
+      if (value instanceof java.time.LocalDate) {
+        return (int) ((java.time.LocalDate) value).toEpochDay();
+      }
+      if (value instanceof java.time.LocalTime) {
+        return (int) (((java.time.LocalTime) value).toNanoOfDay() / 
1_000_000L);
+      }
+      if (value instanceof java.time.LocalDateTime) {
+        return ((java.time.LocalDateTime) 
value).toInstant(java.time.ZoneOffset.UTC).toEpochMilli();
+      }
+      if (value instanceof java.sql.Timestamp) {
+        return ((java.sql.Timestamp) value).getTime();
+      }
+      if (value instanceof java.util.Date) {
+        return ((java.util.Date) value).getTime();
+      }
+      return value;
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java
similarity index 92%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest
rename to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java
index 790312b7e75..de3c8e6f301 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
@@ -33,8 +35,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.junit.Rule;
 import org.junit.Test;
-import org.testcontainers.shaded.com.fasterxml.jackson.databind.MapperFeature;
-import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
 
 public class BeamSqlAliasTest implements Serializable {
 
@@ -42,10 +42,10 @@ public class BeamSqlAliasTest implements Serializable {
 
   @Test
   public void testSqlWithAliasIsNotIgnoredWithOptimizers() {
-    String ID = "id";
-    String EVENT = "event";
+    final String id = "id";
+    final String event = "event";
 
-    Schema inputType = 
Schema.builder().addStringField(ID).addStringField(EVENT).build();
+    Schema inputType = 
Schema.builder().addStringField(id).addStringField(event).build();
 
     String sql =
         "select event as event_name, count(*) as c\n" + "from PCOLLECTION\n" + 
"group by event";
@@ -91,4 +91,4 @@ public class BeamSqlAliasTest implements Serializable {
 
     pipeline.run().waitUntilFinish();
   }
-}
\ No newline at end of file
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java
new file mode 100644
index 00000000000..9166fd16e0a
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslParametersTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import static 
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithoutTimeZone;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for query parameters in Beam SQL. */
+public class BeamSqlDslParametersTest extends BeamSqlDslBase {
+
+  @Test
+  public void testPositionalParameters() {
+    String sql = "SELECT f_int, f_string FROM PCOLLECTION WHERE f_int = ? AND 
f_string = ?";
+
+    PCollection<Row> result =
+        boundedInput1.apply(
+            "testPositionalParameters",
+            SqlTransform.query(sql).withPositionalParameters(Arrays.asList(1, 
"string_row1")));
+
+    Row expectedRow =
+        
Row.withSchema(Schema.builder().addInt32Field("f_int").addStringField("f_string").build())
+            .addValues(1, "string_row1")
+            .build();
+
+    PAssert.that(result).containsInAnyOrder(expectedRow);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testDateTimeParameters() {
+    String sql =
+        "SELECT f_int FROM PCOLLECTION WHERE f_date = ? AND f_time = ? AND 
f_datetime = ? AND f_timestamp = ?";
+
+    PCollection<Row> result =
+        boundedInput1.apply(
+            "testDateTimeParameters",
+            SqlTransform.query(sql)
+                .withPositionalParameters(
+                    Arrays.asList(
+                        LocalDate.of(2017, 1, 1),
+                        LocalTime.of(1, 1, 3),
+                        LocalDateTime.of(2017, 1, 1, 1, 1, 3),
+                        new Instant(parseTimestampWithoutTimeZone("2017-01-01 
01:01:03")))));
+
+    Row expectedRow =
+        
Row.withSchema(Schema.builder().addInt32Field("f_int").build()).addValues(1).build();
+
+    PAssert.that(result).containsInAnyOrder(expectedRow);
+
+    pipeline.run();
+  }
+}

Reply via email to