gemini-code-assist[bot] commented on code in PR #36571:
URL: https://github.com/apache/beam/pull/36571#discussion_r2525195410
##########
sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java:
##########
@@ -31,28 +32,11 @@ public class IcebergCatalog extends InMemoryCatalog {
// other SDKs can make use of it too
private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop";
private final Map<String, IcebergMetastore> metaStores = new HashMap<>();
- @VisibleForTesting final IcebergCatalogConfig catalogConfig;
+ @VisibleForTesting IcebergCatalogConfig catalogConfig;
Review Comment:

The `catalogConfig` field is now non-final to support `ALTER CATALOG`
operations. While this is necessary, making a previously immutable field
mutable can introduce risks, especially concerning thread safety. The
`catalog()` method in `IcebergCatalogConfig` which this field holds a reference
to is not thread-safe. It would be safer to ensure that all modifications and
accesses to this field are handled in a thread-safe manner if there's any
chance of concurrent access.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import org.apache.beam.sdk.extensions.sql.impl.TableName;
+import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterTable extends SqlAlter implements
BeamSqlParser.ExecutableStatement {
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE);
+ private final SqlIdentifier name;
+ private final @Nullable List<Field> columnsToAdd;
+ private final @Nullable SqlNodeList columnsToDrop;
+ private final @Nullable SqlNodeList partitionsToAdd;
+ private final @Nullable SqlNodeList partitionsToDrop;
+ private final @Nullable SqlNodeList setProps;
+ private final @Nullable SqlNodeList resetProps;
+
+ public SqlAlterTable(
+ SqlParserPos pos,
+ @Nullable String scope,
+ SqlNode name,
+ @Nullable List<Field> columnsToAdd,
+ @Nullable SqlNodeList columnsToDrop,
+ @Nullable SqlNodeList partitionsToAdd,
+ @Nullable SqlNodeList partitionsToDrop,
+ @Nullable SqlNodeList setProps,
+ @Nullable SqlNodeList resetProps) {
+ super(pos, scope);
+ this.name = SqlDdlNodes.getIdentifier(name, pos);
+ this.columnsToAdd = columnsToAdd;
+ this.columnsToDrop = columnsToDrop;
+ this.partitionsToAdd = partitionsToAdd;
+ this.partitionsToDrop = partitionsToDrop;
+ this.setProps = setProps;
+ this.resetProps = resetProps;
+ }
+
+ @Override
+ public void execute(CalcitePrepare.Context context) {
+ final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true,
name);
+ TableName pathOverride = TableName.create(name.toString());
+ Schema schema = pair.left.schema;
+
+ BeamCalciteSchema beamCalciteSchema;
+ if (schema instanceof CatalogManagerSchema) {
+ CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema)
schema;
+ CatalogSchema catalogSchema =
+ pathOverride.catalog() != null
+ ? catalogManagerSchema.getCatalogSchema(pathOverride)
+ : catalogManagerSchema.getCurrentCatalogSchema();
+ beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
+ } else if (schema instanceof BeamCalciteSchema) {
+ beamCalciteSchema = (BeamCalciteSchema) schema;
+ } else {
+ throw SqlUtil.newContextException(
+ name.getParserPosition(),
+ RESOURCE.internal(
+ "Attempting to drop a table using unexpected Calcite Schema of
type "
+ + schema.getClass()));
+ }
+
+ if (beamCalciteSchema.getTable(pair.right) == null) {
+ // Table does not exist.
+ throw SqlUtil.newContextException(
+ name.getParserPosition(), RESOURCE.tableNotFound(name.toString()));
+ }
+
+ Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
+ List<String> resetPropsList = SqlDdlNodes.getStringList(resetProps);
+ List<String> columnsToDropList = SqlDdlNodes.getStringList(columnsToDrop);
+ List<String> partitionsToAddList =
SqlDdlNodes.getStringList(partitionsToAdd);
+ List<String> partitionsToDropList =
SqlDdlNodes.getStringList(partitionsToDrop);
+
+ AlterTableOps alterOps =
+
beamCalciteSchema.getTableProvider().alterTable(SqlDdlNodes.name(name));
+
+ if (!setPropsMap.isEmpty() || !resetPropsList.isEmpty()) {
+ validateNonOverlappingProps(setPropsMap, resetPropsList);
+
+ alterOps.updateTableProperties(setPropsMap, resetPropsList);
+ }
+ if (!columnsToDropList.isEmpty() || (columnsToAdd != null &&
!columnsToAdd.isEmpty())) {
+ alterOps.updateSchema(firstNonNull(columnsToAdd,
Collections.emptyList()), columnsToDropList);
+ }
+ if (!partitionsToDropList.isEmpty() || !partitionsToAddList.isEmpty()) {
+ alterOps.updatePartitionSpec(partitionsToAddList, partitionsToDropList);
+ }
+ }
+
+ private void validateNonOverlappingProps(
+ Map<String, String> setPropsMap, Collection<String> resetPropsList) {
+ ImmutableList.Builder<String> overlappingPropsBuilder =
ImmutableList.builder();
+
+
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+
+ List<String> overlappingProps = overlappingPropsBuilder.build();
+ checkState(
+ overlappingProps.isEmpty(),
+ "Invalid '%s' call: Found overlapping properties between SET and
RESET: %s.",
+ OPERATOR,
+ overlappingProps);
+ }
+
+ @Override
+ public void unparseAlterOperation(SqlWriter writer, int left, int right) {
+ writer.keyword("CATALOG");
+ name.unparse(writer, left, right);
+ if (setProps != null && !setProps.isEmpty()) {
+ writer.keyword("SET");
+ writer.keyword("(");
+ for (int i = 0; i < setProps.size(); i++) {
+ if (i > 0) {
+ writer.keyword(",");
+ }
+ SqlNode property = setProps.get(i);
+ checkState(
+ property instanceof SqlNodeList,
+ String.format(
+ "Unexpected properties entry '%s' of class '%s'", property,
property.getClass()));
+ SqlNodeList kv = ((SqlNodeList) property);
+
+ kv.get(0).unparse(writer, left, right); // key
+ writer.keyword("=");
+ kv.get(1).unparse(writer, left, right); // value
+ }
+ writer.keyword(")");
+ }
+
+ if (resetProps != null) {
+ writer.keyword("RESET");
+ writer.sep("(");
+ for (int i = 0; i < resetProps.size(); i++) {
+ if (i > 0) {
+ writer.sep(",");
+ }
+ SqlNode field = resetProps.get(i);
+ field.unparse(writer, 0, 0);
+ }
+ writer.sep(")");
+ }
+ }
Review Comment:

The `unparseAlterOperation` method appears to be an incorrect copy from
`SqlAlterCatalog` and does not correctly unparse an `ALTER TABLE` statement. It
uses the `CATALOG` keyword instead of `TABLE` and only handles `SET`/`RESET`
clauses for properties, while ignoring `ADD`/`DROP` clauses for columns and
partitions. This should be corrected to fully support unparsing of all `ALTER
TABLE` variations.
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.parser;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
+import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter;
+import
org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class SqlAlterCatalog extends SqlAlter implements
BeamSqlParser.ExecutableStatement {
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL);
+ private final SqlIdentifier name;
+ private final @Nullable SqlNodeList setProps;
+ private final @Nullable SqlNodeList resetProps;
+
+ public SqlAlterCatalog(
+ SqlParserPos pos,
+ @Nullable String scope,
+ SqlNode name,
+ @Nullable SqlNodeList setProps,
+ @Nullable SqlNodeList resetProps) {
+ super(pos, scope);
+ this.name = SqlDdlNodes.getIdentifier(name, pos);
+ this.setProps = setProps;
+ this.resetProps = resetProps;
+ }
+
+ @Override
+ public void execute(CalcitePrepare.Context context) {
+ final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context, true,
name);
+ Schema schema = pair.left.schema;
+
+ if (!(schema instanceof CatalogManagerSchema)) {
+ throw SqlUtil.newContextException(
+ name.getParserPosition(),
+ RESOURCE.internal(
+ "Attempting to alter catalog '"
+ + SqlDdlNodes.name(name)
+ + "' with unexpected Calcite Schema of type "
+ + schema.getClass()));
+ }
+
+ CatalogSchema catalogSchema =
+ ((CatalogManagerSchema)
schema).getCatalogSchema(SqlDdlNodes.getString(name));
+
+ Map<String, String> setPropsMap = parseSetProps();
+ Collection<String> resetProps = parseResetProps();
+
+ ImmutableList.Builder<String> overlappingPropsBuilder =
ImmutableList.builder();
+
resetProps.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
+ List<String> overlappingProps = overlappingPropsBuilder.build();
+ checkState(
+ overlappingProps.isEmpty(),
+ "Invalid %s call: Found overlapping properties between SET and RESET:
%s.",
+ OPERATOR,
+ overlappingProps);
+
+ catalogSchema.updateProperties(setPropsMap, resetProps);
+ }
Review Comment:

The `execute` method can be simplified by using the new helper methods
`SqlDdlNodes.getStringMap(setProps)` and
`SqlDdlNodes.getStringList(resetProps)` to parse the properties. This avoids
code duplication with `SqlAlterTable` and allows for the removal of the
now-redundant private `parseSetProps` and `parseResetProps` methods.
```java
@Override
public void execute(CalcitePrepare.Context context) {
final Pair<CalciteSchema, String> pair = SqlDdlNodes.schema(context,
true, name);
Schema schema = pair.left.schema;
if (!(schema instanceof CatalogManagerSchema)) {
throw SqlUtil.newContextException(
name.getParserPosition(),
RESOURCE.internal(
"Attempting to alter catalog '"
+ SqlDdlNodes.name(name)
+ "' with unexpected Calcite Schema of type "
+ schema.getClass()));
}
CatalogSchema catalogSchema =
((CatalogManagerSchema)
schema).getCatalogSchema(SqlDdlNodes.getString(name));
Map<String, String> setPropsMap = SqlDdlNodes.getStringMap(setProps);
Collection<String> resetPropsList =
SqlDdlNodes.getStringList(resetProps);
ImmutableList.Builder<String> overlappingPropsBuilder =
ImmutableList.builder();
resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add);
List<String> overlappingProps = overlappingPropsBuilder.build();
checkState(
overlappingProps.isEmpty(),
"Invalid %s call: Found overlapping properties between SET and
RESET: %s.",
OPERATOR,
overlappingProps);
catalogSchema.updateProperties(setPropsMap, resetPropsList);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]