gemini-code-assist[bot] commented on code in PR #38951:
URL: https://github.com/apache/beam/pull/38951#discussion_r3405850465


##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -90,11 +98,52 @@ public class CalciteQueryPlanner implements QueryPlanner {
 
   private final Planner planner;
   private final JdbcConnection connection;
+  private final FrameworkConfig config;
+
+  // Cannot be final because of wacky initialization logic
+  private RelOptCluster relOptCluster;
+  private CalciteCatalogReader catalogReader;
+  private RelDataTypeFactory typeFactory;
+  private RelOptPlanner calcitePlanner;

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   These fields are completely unused in the class. Storing references to them 
is also unsafe because they are captured from a temporary planner inside 
`Frameworks.withPlanner(...)` which is closed immediately after the block 
execution. Using them later would reference closed/disposed Calcite components. 
They should be removed entirely.



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement, 
QueryParameters queryPa
                 relNode,
                 new ParameterBinder(root.rel.getCluster().getRexBuilder(), 
queryParameters));
       }
-      LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
+      return convertToBeamRel(relNode, queryParameters);
+    } catch (RelConversionException | CannotPlanException e) {
+      throw new SqlConversionException(
+          String.format("Unable to convert query %s", sqlStatement), e);
+    } catch (SqlParseException | ValidationException e) {
+      throw new ParseException(String.format("Unable to parse query %s", 
sqlStatement), e);
+    } finally {
+      planner.close();
+    }
+  }
+
+  private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+    RelNode newRel = rel.accept(binder);
+    java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
+    for (RelNode input : newRel.getInputs()) {
+      newInputs.add(bindParameters(input, binder));
+    }
+    return newRel.copy(newRel.getTraitSet(), newInputs);
+  }
+
+  @Override
+  public RelNode parseToRel(String sqlStatement, QueryParameters 
queryParameters)
+      throws ParseException, SqlConversionException {
+    Preconditions.checkArgument(
+        queryParameters.getKind() == Kind.NONE,
+        "Beam SQL Calcite dialect does not yet support query parameters.");
+    try {
+      SqlNode parsed = planner.parse(sqlStatement);
+      TableResolutionUtils.setupCustomTableResolution(connection, parsed);
+      SqlNode validated = planner.validate(parsed);
+      // root of original logical plan
+      RelRoot root = planner.rel(validated);
+      return root.rel;
+    } catch (RelConversionException e) {
+      throw new SqlConversionException(
+          String.format("Unable to convert query %s", sqlStatement), e);
+    } catch (SqlParseException | ValidationException e) {
+      throw new ParseException(String.format("Unable to parse query %s", 
sqlStatement), e);
+    } finally {
+      planner.close();
+    }

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Calling `planner.close()` in the `finally` block of `parseToRel` is 
premature and critical. It closes the Calcite planner immediately after 
parsing, which invalidates the returned `RelNode`'s cluster and planner state 
before the caller can pass it to `convertToBeamRel`. It also prevents reusing 
the `BeamSqlEnv` or planner for any subsequent operations. The `finally` block 
should be removed here, leaving the planner open for the subsequent physical 
planning stage.



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java:
##########
@@ -39,9 +41,29 @@ public interface QueryPlanner {
   BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters 
queryParameters)
       throws ParseException, SqlConversionException;
 
+  /** It parses and validate the input query, then convert into a {@link 
BeamRelNode} tree. */
+  BeamRelNode convertToBeamRel(RelNode sqlStatement, QueryParameters 
queryParameters)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The parameter name `sqlStatement` is misleading because the type is 
`RelNode` (which represents a logical plan/relational expression, not a raw SQL 
string). It should be renamed to `relNode` or `logicalPlan` to match its type 
and purpose.
   
   ```suggestion
     BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters 
queryParameters)
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement, 
QueryParameters queryPa
                 relNode,
                 new ParameterBinder(root.rel.getCluster().getRexBuilder(), 
queryParameters));
       }
-      LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
+      return convertToBeamRel(relNode, queryParameters);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   To preserve the query's collation (e.g., for `ORDER BY` clauses) and avoid a 
regression, pass `root.collation` to the helper method.
   
   ```suggestion
         return convertToBeamRel(relNode, queryParameters, root.collation);
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -90,11 +98,52 @@ public class CalciteQueryPlanner implements QueryPlanner {
 
   private final Planner planner;
   private final JdbcConnection connection;
+  private final FrameworkConfig config;
+
+  // Cannot be final because of wacky initialization logic
+  private RelOptCluster relOptCluster;
+  private CalciteCatalogReader catalogReader;
+  private RelDataTypeFactory typeFactory;
+  private RelOptPlanner calcitePlanner;
 
   /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */
   public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet> 
ruleSets) {
     this.connection = connection;
-    this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
+    this.config = defaultConfig(connection, ruleSets);
+    this.planner = Frameworks.getPlanner(config);
+
+    Frameworks.withPlanner(
+        (cluster, relOptSchema, rootSchema) -> {
+          // CAPTURE THE COMPONENTS HERE
+          this.relOptCluster = cluster;
+          this.catalogReader = (CalciteCatalogReader) relOptSchema;
+          this.typeFactory = cluster.getTypeFactory();
+          this.calcitePlanner = cluster.getPlanner();
+
+          // ... any other setup from the original lambda ...
+          // e.g., planner.setExecutor(executor);
+
+          return null;
+        },
+        config);
+
+    if (this.relOptCluster == null || this.catalogReader == null) {
+      throw new IllegalStateException("Failed to initialize Calcite 
components");
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This `Frameworks.withPlanner` block is used solely to initialize the unused 
and unsafe fields (`relOptCluster`, `catalogReader`, etc.). Since those fields 
are unused and their underlying planner is closed when this block exits, this 
entire block and the subsequent null check should be removed to simplify the 
constructor.



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java:
##########
@@ -203,12 +298,59 @@ public BeamRelNode convertToBeamRel(String sqlStatement, 
QueryParameters queryPa
                 relNode,
                 new ParameterBinder(root.rel.getCluster().getRexBuilder(), 
queryParameters));
       }
-      LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(root.rel));
+      return convertToBeamRel(relNode, queryParameters);
+    } catch (RelConversionException | CannotPlanException e) {
+      throw new SqlConversionException(
+          String.format("Unable to convert query %s", sqlStatement), e);
+    } catch (SqlParseException | ValidationException e) {
+      throw new ParseException(String.format("Unable to parse query %s", 
sqlStatement), e);
+    } finally {
+      planner.close();
+    }
+  }
+
+  private static RelNode bindParameters(RelNode rel, RexShuttle binder) {
+    RelNode newRel = rel.accept(binder);
+    java.util.List<RelNode> newInputs = new java.util.ArrayList<>();
+    for (RelNode input : newRel.getInputs()) {
+      newInputs.add(bindParameters(input, binder));
+    }
+    return newRel.copy(newRel.getTraitSet(), newInputs);
+  }
+
+  @Override
+  public RelNode parseToRel(String sqlStatement, QueryParameters 
queryParameters)
+      throws ParseException, SqlConversionException {
+    Preconditions.checkArgument(
+        queryParameters.getKind() == Kind.NONE,
+        "Beam SQL Calcite dialect does not yet support query parameters.");
+    try {
+      SqlNode parsed = planner.parse(sqlStatement);
+      TableResolutionUtils.setupCustomTableResolution(connection, parsed);
+      SqlNode validated = planner.validate(parsed);
+      // root of original logical plan
+      RelRoot root = planner.rel(validated);
+      return root.rel;
+    } catch (RelConversionException e) {
+      throw new SqlConversionException(
+          String.format("Unable to convert query %s", sqlStatement), e);
+    } catch (SqlParseException | ValidationException e) {
+      throw new ParseException(String.format("Unable to parse query %s", 
sqlStatement), e);
+    } finally {
+      planner.close();
+    }
+  }
+
+  @Override
+  public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters 
queryParameters) {
+    RelNode beamRelNode;
+    try {
+      LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode));
       RelTraitSet desiredTraits =
           relNode
               .getTraitSet()
               .replace(BeamLogicalConvention.INSTANCE)
-              .replace(root.collation)
+              // .replace(root.collation)
               .simplify();
       // beam physical plan
       relNode

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Commenting out `// .replace(root.collation)` causes a regression because the 
query's collation is discarded during physical planning. To fix this and keep 
the API clean, introduce a private helper that accepts an optional 
`RelCollation` and applies it if present.
   
   ```java
     @Override
     public BeamRelNode convertToBeamRel(RelNode relNode, QueryParameters 
queryParameters) {
       return convertToBeamRel(relNode, queryParameters, null);
     }
   
     private BeamRelNode convertToBeamRel(
         RelNode relNode,
         QueryParameters queryParameters,
         @Nullable 
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.RelCollation 
collation) {
       RelNode beamRelNode;
       try {
         LOG.info("SQLPlan>\n{}", BeamSqlRelUtils.explainLazily(relNode));
         RelTraitSet desiredTraits =
             relNode
                 .getTraitSet()
                 .replace(BeamLogicalConvention.INSTANCE);
         if (collation != null) {
           desiredTraits = desiredTraits.replace(collation);
         }
         desiredTraits = desiredTraits.simplify();
         // beam physical plan
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to