Copilot commented on code in PR #10068:
URL: https://github.com/apache/gravitino/pull/10068#discussion_r2888113047


##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(
+        String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName, 
HOLO_QUOTE, NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s%s%s", HOLO_QUOTE, column.name(), 
HOLO_QUOTE));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              withEntries.add(String.format("%s = '%s'", key, value));
+            }
+          });
+    }
+
+    // Generate WITH clause
+    if (!withEntries.isEmpty()) {
+      sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+      sqlBuilder.append(
+          withEntries.stream()
+              .map(entry -> String.format("    %s", entry))
+              .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+      sqlBuilder.append(String.format("%s)", NEW_LINE));
+    }
+
+    sqlBuilder.append(";");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      String escapedComment = comment.replace("'", "''");
+      sqlBuilder
+          .append(NEW_LINE)
+          .append(
+              String.format(
+                  "COMMENT ON TABLE %s%s%s IS '%s';",
+                  HOLO_QUOTE, tableName, HOLO_QUOTE, escapedComment));
+    }
+    Arrays.stream(columns)
+        .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+        .forEach(
+            jdbcColumn -> {
+              String escapedColComment = jdbcColumn.comment().replace("'", 
"''");
+              sqlBuilder
+                  .append(NEW_LINE)
+                  .append(
+                      String.format(
+                          "COMMENT ON COLUMN %s%s%s.%s%s%s IS '%s';",
+                          HOLO_QUOTE,
+                          tableName,
+                          HOLO_QUOTE,
+                          HOLO_QUOTE,
+                          jdbcColumn.name(),
+                          HOLO_QUOTE,
+                          escapedColComment));
+            });
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @VisibleForTesting
+  static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(String.format(",%s", NEW_LINE));
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Hologres only supports PRIMARY_KEY index, but got: " + 
index.type());
+      }
+    }
+  }
+
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in Hologres");
+              }
+              return String.format("%s%s%s", HOLO_QUOTE, colNames[0], 
HOLO_QUOTE);
+            })
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Append the partitioning clause to the CREATE TABLE SQL.
+   *
+   * <p>Hologres supports two types of partition tables:
+   *
+   * <ul>
+   *   <li>Physical partition table: uses {@code PARTITION BY LIST(column)} 
syntax
+   *   <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY 
LIST(column1[,
+   *       column2])} syntax
+   * </ul>
+   *
+   * @param partitioning the partition transforms (only LIST partitioning is 
supported)
+   * @param isLogicalPartition whether to create a logical partition table
+   * @param sqlBuilder the SQL builder to append to
+   */
+  @VisibleForTesting
+  static void appendPartitioningSql(
+      Transform[] partitioning, boolean isLogicalPartition, StringBuilder 
sqlBuilder) {
+    Preconditions.checkArgument(
+        partitioning.length == 1,
+        "Hologres only supports single partition transform, but got %s",
+        partitioning.length);
+    Preconditions.checkArgument(
+        partitioning[0] instanceof Transforms.ListTransform,
+        "Hologres only supports LIST partitioning, but got %s",
+        partitioning[0].getClass().getSimpleName());
+
+    Transforms.ListTransform listTransform = (Transforms.ListTransform) 
partitioning[0];
+    String[][] fieldNames = listTransform.fieldNames();
+
+    Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must 
not be empty");
+
+    if (isLogicalPartition) {
+      Preconditions.checkArgument(
+          fieldNames.length <= 2,
+          "Logical partition table supports at most 2 partition columns, but 
got: %s",
+          fieldNames.length);
+    } else {
+      Preconditions.checkArgument(
+          fieldNames.length == 1,
+          "Physical partition table supports exactly 1 partition column, but 
got: %s",
+          fieldNames.length);
+    }
+
+    String partitionColumns =
+        Arrays.stream(fieldNames)
+            .map(
+                colNames -> {
+                  Preconditions.checkArgument(
+                      colNames.length == 1,
+                      "Hologres partition does not support nested field 
names");
+                  return String.format("%s%s%s", HOLO_QUOTE, colNames[0], 
HOLO_QUOTE);
+                })
+            .collect(Collectors.joining(", "));
+
+    sqlBuilder.append(NEW_LINE);
+    if (isLogicalPartition) {
+      sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)", 
partitionColumns));
+    } else {
+      sqlBuilder.append(String.format("PARTITION BY LIST(%s)", 
partitionColumns));
+    }
+  }
+
+  private void appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Hologres does not support auto-increment columns via Gravitino
+    if (column.autoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support creating auto-increment columns via 
Gravitino, column: "
+              + column.name());
+    }
+
+    // Handle generated (stored computed) columns:
+    // GENERATED ALWAYS AS (expr) STORED must come before nullable constraints.
+    if (column.defaultValue() instanceof UnparsedExpression) {
+      String expr = ((UnparsedExpression) 
column.defaultValue()).unparsedExpression();
+      sqlBuilder.append(String.format("GENERATED ALWAYS AS (%s) STORED ", 
expr));
+      if (column.nullable()) {
+        sqlBuilder.append("NULL ");
+      } else {
+        sqlBuilder.append("NOT NULL ");
+      }
+      return;
+    }
+
+    // Add NOT NULL if the column is marked as such
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+    // Add DEFAULT value if specified

Review Comment:
   `appendColumnDefinition` treats any `UnparsedExpression` default value as a 
generated column (`GENERATED ALWAYS AS ... STORED`) and returns early, which 
breaks normal default-expression handling (e.g., `now()`, `random()`) that 
other JDBC catalogs represent as `UnparsedExpression` via their 
`*ColumnDefaultValueConverter`. This will silently change semantics from 
“DEFAULT expr” to a stored computed column. Consider removing this 
special-casing and letting `appendDefaultValue()` handle `UnparsedExpression`, 
or introduce an explicit, separate signal/field for generated columns so 
defaults and generated columns aren’t conflated.
   ```suggestion
       // Add NULL / NOT NULL constraint
       if (column.nullable()) {
         sqlBuilder.append("NULL ");
       } else {
         sqlBuilder.append("NOT NULL ");
       }
   
       // Add DEFAULT value if specified. This also handles UnparsedExpression 
defaults,
       // ensuring they are treated as normal DEFAULT expressions rather than 
generated columns.
   ```



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(
+        String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName, 
HOLO_QUOTE, NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s%s%s", HOLO_QUOTE, column.name(), 
HOLO_QUOTE));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              withEntries.add(String.format("%s = '%s'", key, value));

Review Comment:
   The generated SQL for the WITH clause interpolates arbitrary `properties` 
keys/values directly into the DDL (`%s = '%s'`) without any 
escaping/validation. Since JDBC table properties come from user input and are 
not restricted by `JdbcTablePropertiesMetadata`, this can produce invalid SQL 
when values contain single-quotes and also opens a SQL-injection vector in DDL 
generation. Recommend validating property keys against an allow-list of 
supported Hologres properties and escaping single-quotes in values (at 
minimum), or otherwise rejecting unsafe keys/values.
   ```suggestion
         String escapedDistributionColumns = distributionColumns.replace("'", 
"''");
         withEntries.add(String.format("distribution_key = '%s'", 
escapedDistributionColumns));
       }
   
       // Add user-specified properties (filter out read-only / 
internally-handled properties)
       if (MapUtils.isNotEmpty(properties)) {
         // Only allow known-safe Hologres table properties to be emitted into 
the WITH clause.
         Set<String> allowedTableProperties =
             ImmutableSet.of(
                 "orientation",
                 "distribution_key",
                 "clustering_key",
                 "event_time_column",
                 "bitmap_columns",
                 "dictionary_encoding_columns",
                 "time_to_live_in_seconds",
                 "table_group");
   
         properties.forEach(
             (key, value) -> {
               if (!EXCLUDED_TABLE_PROPERTIES.contains(key)
                   && allowedTableProperties.contains(key)
                   && value != null) {
                 String escapedValue = value.replace("'", "''");
                 withEntries.add(String.format("%s = '%s'", key, escapedValue));
   ```



##########
catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java:
##########
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import 
org.apache.gravitino.catalog.hologres.converter.HologresColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.hologres.converter.HologresExceptionConverter;
+import org.apache.gravitino.catalog.hologres.converter.HologresTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link HologresTableOperations}. */
+public class TestHologresTableOperations {
+
+  // ==================== Helper inner class for SQL generation tests 
====================
+
+  private static class TestableHologresTableOperations extends 
HologresTableOperations {
+    public TestableHologresTableOperations() {
+      super.exceptionMapper = new HologresExceptionConverter();
+      super.typeConverter = new HologresTypeConverter();
+      super.columnDefaultValueConverter = new 
HologresColumnDefaultValueConverter();
+    }
+
+    public String createTableSql(
+        String tableName,
+        JdbcColumn[] columns,
+        String comment,
+        Map<String, String> properties,
+        Transform[] partitioning,
+        Distribution distribution,
+        Index[] indexes) {
+      return generateCreateTableSql(
+          tableName, columns, comment, properties, partitioning, distribution, 
indexes);
+    }
+
+    public String renameTableSql(String oldName, String newName) {
+      return generateRenameTableSql(oldName, newName);
+    }
+
+    public String dropTableSql(String tableName) {
+      return generateDropTableSql(tableName);
+    }
+
+    public String purgeTableSql(String tableName) {
+      return generatePurgeTableSql(tableName);
+    }
+  }
+
+  private final TestableHologresTableOperations ops = new 
TestableHologresTableOperations();
+
+  // ==================== appendPartitioningSql tests ====================
+
+  @Test
+  void testAppendPartitioningSqlPhysicalPartition() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, false, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("PARTITION BY LIST(\"ds\")"));
+    assertFalse(result.contains("LOGICAL"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalPartitionSingleColumn() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, true, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalPartitionTwoColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"region"}, 
{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, true, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"region\", 
\"ds\")"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsNonListTransform() {
+    Transform[] partitioning = {Transforms.identity("col1")};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlPhysicalRejectsMultipleColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"col1"}, 
{"col2"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalRejectsMoreThanTwoColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"col1"}, 
{"col2"}, {"col3"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
true, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsEmptyPartitions() {
+    Transform[] partitioning = {Transforms.list(new String[][] {})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsMultipleTransforms() {
+    Transform[] partitioning = {
+      Transforms.list(new String[][] {{"col1"}}), Transforms.list(new 
String[][] {{"col2"}})
+    };
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsNestedFieldNames() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"schema", 
"col1"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  // ==================== appendIndexesSql tests ====================
+
+  @Test
+  void testAppendIndexesSqlEmpty() {
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[0], sqlBuilder);
+    assertTrue(sqlBuilder.toString().isEmpty());
+  }
+
+  @Test
+  void testAppendIndexesSqlPrimaryKey() {
+    // Hologres does not support custom constraint names, so the name is 
ignored
+    Index pk = Indexes.primary("pk_test", new String[][] {{"id"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertFalse(result.contains("CONSTRAINT"));
+    assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlPrimaryKeyWithoutName() {
+    Index pk = Indexes.primary(null, new String[][] {{"id"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertFalse(result.contains("CONSTRAINT"));
+    assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlUniqueKeyThrows() {
+    // Hologres does not support UNIQUE KEY index separately
+    Index uk = Indexes.unique("uk_name", new String[][] {{"name"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendIndexesSql(new Index[] {uk}, 
sqlBuilder));
+  }
+
+  @Test
+  void testAppendIndexesSqlCompositeKey() {
+    Index pk = Indexes.primary("pk_comp", new String[][] {{"id"}, {"ds"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("PRIMARY KEY (\"id\", \"ds\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlMultipleIndexesWithUniqueKeyThrows() {
+    // Hologres does not support UNIQUE KEY, so mixing PK and UK should throw
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    Index uk = Indexes.unique("uk_email", new String[][] {{"email"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendIndexesSql(new Index[] {pk, uk}, 
sqlBuilder));
+  }
+
+  // ==================== generateCreateTableSql tests ====================
+
+  @Test
+  void testCreateTableBasic() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("CREATE TABLE \"test_table\""));
+    assertTrue(sql.contains("\"id\" int4"));
+    assertTrue(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithComment() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            "This is a test table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("COMMENT ON TABLE \"test_table\" IS 'This is a 
test table'"));
+  }
+
+  @Test
+  void testCreateTableWithColumnComment() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .withComment("Primary key column")
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("COMMENT ON COLUMN \"test_table\".\"id\" IS 
'Primary key column'"));
+  }
+
+  @Test
+  void testCreateTableWithDistribution() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            dist,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("distribution_key = 'id'"));
+    assertTrue(sql.contains("WITH ("));
+  }
+
+  @Test
+  void testCreateTableWithProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("orientation", "column");
+    properties.put("time_to_live_in_seconds", "3600");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("time_to_live_in_seconds = '3600'"));
+  }
+
+  @Test
+  void testCreateTableDistributionKeyFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("distribution_key", "should_be_ignored");
+    properties.put("orientation", "column");
+    Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            dist,
+            Indexes.EMPTY_INDEXES);
+    // Should use the Distribution parameter, not the property
+    assertTrue(sql.contains("distribution_key = 'id'"));
+    assertFalse(sql.contains("should_be_ignored"));
+  }
+
+  @Test
+  void testCreateTableIsLogicalPartitionedFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("orientation", "column");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // is_logical_partitioned_table should NOT appear in WITH clause
+    assertFalse(sql.contains("is_logical_partitioned_table"));
+    assertTrue(sql.contains("orientation = 'column'"));
+  }
+
+  @Test
+  void testCreateTablePrimaryKeyFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("primary_key", "id");
+    properties.put("orientation", "column");
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new Index[] {pk});
+    // primary_key should NOT appear in WITH clause (it is defined via PRIMARY 
KEY constraint)
+    assertFalse(sql.contains("primary_key"));
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("PRIMARY KEY"));
+  }
+
+  @Test
+  void testCreateTableWithPhysicalPartition() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("PARTITION BY LIST(\"ds\")"));
+    assertFalse(sql.contains("LOGICAL"));
+  }
+
+  @Test
+  void testCreateTableWithLogicalPartition() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+  }
+
+  @Test
+  void testCreateTableWithLogicalPartitionTwoColumns() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("region")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    Transform[] partitioning = {Transforms.list(new String[][] {{"region"}, 
{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col1, col2},
+            null,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"region\", \"ds\")"));
+  }
+
+  @Test
+  void testCreateTableWithPrimaryKey() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new Index[] {pk});
+    assertTrue(sql.contains("PRIMARY KEY (\"id\")"));
+    // Hologres does not support custom constraint names, so CONSTRAINT should 
not appear
+    assertFalse(sql.contains("CONSTRAINT"));
+  }
+
+  @Test
+  void testCreateTableWithAutoIncrementThrows() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .withAutoIncrement(true)
+            .build();
+    IllegalArgumentException ex =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                ops.createTableSql(
+                    "test_table",
+                    new JdbcColumn[] {col},
+                    null,
+                    Collections.emptyMap(),
+                    Transforms.EMPTY_TRANSFORM,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES));
+    assertTrue(ex.getMessage().contains("auto-increment"));
+  }
+
+  @Test
+  void testCreateTableWithNullableColumn() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("name")
+            .withType(Types.StringType.get())
+            .withNullable(true)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("NULL"));
+    assertFalse(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithDefaultValue() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("status")
+            .withType(Types.IntegerType.get())
+            .withNullable(true)
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("DEFAULT 0"));
+  }
+
+  @Test
+  void testCreateTableMultipleColumns() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("name")
+            .withType(Types.StringType.get())
+            .withNullable(true)
+            .build();
+    JdbcColumn col3 =
+        JdbcColumn.builder()
+            .withName("amount")
+            .withType(Types.DecimalType.of(10, 2))
+            .withNullable(true)
+            .withDefaultValue(
+                
Literals.decimalLiteral(org.apache.gravitino.rel.types.Decimal.of("0.00", 10, 
2)))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "orders",
+            new JdbcColumn[] {col1, col2, col3},
+            "Order table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("\"id\" int8"));
+    assertTrue(sql.contains("\"name\" text"));
+    assertTrue(sql.contains("\"amount\" numeric(10,2)"));
+    assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+  }
+
+  @Test
+  void testCreateTableCommentWithSingleQuotes() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("flag")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .withComment("退货标志('R'=已退货, 'A'=未退货)")
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            "It's a test table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // Single quotes in table comment should be escaped
+    assertTrue(sql.contains("IS 'It''s a test table'"));
+    // Single quotes in column comment should be escaped
+    assertTrue(sql.contains("IS '退货标志(''R''=已退货, ''A''=未退货)'"));
+    // Unescaped single quotes should NOT appear
+    assertFalse(sql.contains("IS 'It's"));
+  }
+
+  @Test
+  void testCreateTableFullFeatured() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("order_id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("orientation", "column");
+    Distribution dist = Distributions.hash(0, 
NamedReference.field("order_id"));
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    Index pk = Indexes.primary("pk_order", new String[][] {{"order_id"}, 
{"ds"}});
+    String sql =
+        ops.createTableSql(
+            "orders",
+            new JdbcColumn[] {col1, col2},
+            "Order table",
+            properties,
+            partitioning,
+            dist,
+            new Index[] {pk});
+    assertTrue(sql.contains("CREATE TABLE \"orders\""));
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+    assertTrue(sql.contains("distribution_key = 'order_id'"));
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("PRIMARY KEY (\"order_id\", \"ds\")"));
+    assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+    assertFalse(sql.contains("is_logical_partitioned_table"));
+  }
+
+  // ==================== generateRenameTableSql tests ====================
+
+  @Test
+  void testRenameTableSql() {
+    String sql = ops.renameTableSql("old_table", "new_table");
+    assertEquals("ALTER TABLE \"old_table\" RENAME TO \"new_table\"", sql);
+  }
+
+  // ==================== generateDropTableSql tests ====================
+
+  @Test
+  void testDropTableSql() {
+    String sql = ops.dropTableSql("my_table");
+    assertEquals("DROP TABLE \"my_table\"", sql);
+  }
+
+  // ==================== generatePurgeTableSql tests ====================
+
+  @Test
+  void testPurgeTableSqlThrowsException() {
+    assertThrows(UnsupportedOperationException.class, () -> 
ops.purgeTableSql("my_table"));
+  }
+
+  // ==================== updateColumnAutoIncrement not supported 
====================
+  // Note: Hologres does not support altering column auto-increment via ALTER 
TABLE.
+  // The UpdateColumnAutoIncrement change type is rejected in 
generateAlterTableSql().
+  // CREATE TABLE with auto-increment (GENERATED BY DEFAULT AS IDENTITY) is 
also not supported
+  // and will throw an exception — see 
testCreateTableWithAutoIncrementThrows() above.
+
+  // ==================== addIndex / deleteIndex not supported 
====================
+  // Note: Hologres does not support adding or deleting indexes via ALTER 
TABLE.
+  // The AddIndex and DeleteIndex change types are rejected in 
generateAlterTableSql().
+  // CREATE TABLE with PRIMARY KEY and UNIQUE KEY constraints is still 
supported
+  // — see testCreateTableWithPrimaryKey() above.

Review Comment:
   There are no unit tests exercising the new `generateAlterTableSql()` 
behavior (e.g., add/drop/rename column, update table/column comments, and 
expected exceptions for unsupported change types). Given the amount of new 
logic in `HologresTableOperations`, tests should cover at least the supported 
ALTER paths plus the rejection paths to prevent regressions.



##########
catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/operation/TestHologresTableOperations.java:
##########
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import 
org.apache.gravitino.catalog.hologres.converter.HologresColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.hologres.converter.HologresExceptionConverter;
+import org.apache.gravitino.catalog.hologres.converter.HologresTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link HologresTableOperations}. */
+public class TestHologresTableOperations {
+
+  // ==================== Helper inner class for SQL generation tests 
====================
+
+  private static class TestableHologresTableOperations extends 
HologresTableOperations {
+    public TestableHologresTableOperations() {
+      super.exceptionMapper = new HologresExceptionConverter();
+      super.typeConverter = new HologresTypeConverter();
+      super.columnDefaultValueConverter = new 
HologresColumnDefaultValueConverter();
+    }
+
+    public String createTableSql(
+        String tableName,
+        JdbcColumn[] columns,
+        String comment,
+        Map<String, String> properties,
+        Transform[] partitioning,
+        Distribution distribution,
+        Index[] indexes) {
+      return generateCreateTableSql(
+          tableName, columns, comment, properties, partitioning, distribution, 
indexes);
+    }
+
+    public String renameTableSql(String oldName, String newName) {
+      return generateRenameTableSql(oldName, newName);
+    }
+
+    public String dropTableSql(String tableName) {
+      return generateDropTableSql(tableName);
+    }
+
+    public String purgeTableSql(String tableName) {
+      return generatePurgeTableSql(tableName);
+    }
+  }
+
+  private final TestableHologresTableOperations ops = new 
TestableHologresTableOperations();
+
+  // ==================== appendPartitioningSql tests ====================
+
+  @Test
+  void testAppendPartitioningSqlPhysicalPartition() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, false, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("PARTITION BY LIST(\"ds\")"));
+    assertFalse(result.contains("LOGICAL"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalPartitionSingleColumn() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, true, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalPartitionTwoColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"region"}, 
{"ds"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendPartitioningSql(partitioning, true, 
sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("LOGICAL PARTITION BY LIST(\"region\", 
\"ds\")"));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsNonListTransform() {
+    Transform[] partitioning = {Transforms.identity("col1")};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlPhysicalRejectsMultipleColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"col1"}, 
{"col2"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlLogicalRejectsMoreThanTwoColumns() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"col1"}, 
{"col2"}, {"col3"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
true, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsEmptyPartitions() {
+    Transform[] partitioning = {Transforms.list(new String[][] {})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsMultipleTransforms() {
+    Transform[] partitioning = {
+      Transforms.list(new String[][] {{"col1"}}), Transforms.list(new 
String[][] {{"col2"}})
+    };
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  @Test
+  void testAppendPartitioningSqlRejectsNestedFieldNames() {
+    Transform[] partitioning = {Transforms.list(new String[][] {{"schema", 
"col1"}})};
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendPartitioningSql(partitioning, 
false, sqlBuilder));
+  }
+
+  // ==================== appendIndexesSql tests ====================
+
+  @Test
+  void testAppendIndexesSqlEmpty() {
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[0], sqlBuilder);
+    assertTrue(sqlBuilder.toString().isEmpty());
+  }
+
+  @Test
+  void testAppendIndexesSqlPrimaryKey() {
+    // Hologres does not support custom constraint names, so the name is 
ignored
+    Index pk = Indexes.primary("pk_test", new String[][] {{"id"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertFalse(result.contains("CONSTRAINT"));
+    assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlPrimaryKeyWithoutName() {
+    Index pk = Indexes.primary(null, new String[][] {{"id"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertFalse(result.contains("CONSTRAINT"));
+    assertTrue(result.contains("PRIMARY KEY (\"id\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlUniqueKeyThrows() {
+    // Hologres does not support UNIQUE KEY index separately
+    Index uk = Indexes.unique("uk_name", new String[][] {{"name"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendIndexesSql(new Index[] {uk}, 
sqlBuilder));
+  }
+
+  @Test
+  void testAppendIndexesSqlCompositeKey() {
+    Index pk = Indexes.primary("pk_comp", new String[][] {{"id"}, {"ds"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    HologresTableOperations.appendIndexesSql(new Index[] {pk}, sqlBuilder);
+    String result = sqlBuilder.toString();
+    assertTrue(result.contains("PRIMARY KEY (\"id\", \"ds\")"));
+  }
+
+  @Test
+  void testAppendIndexesSqlMultipleIndexesWithUniqueKeyThrows() {
+    // Hologres does not support UNIQUE KEY, so mixing PK and UK should throw
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    Index uk = Indexes.unique("uk_email", new String[][] {{"email"}});
+    StringBuilder sqlBuilder = new StringBuilder();
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.appendIndexesSql(new Index[] {pk, uk}, 
sqlBuilder));
+  }
+
+  // ==================== generateCreateTableSql tests ====================
+
+  @Test
+  void testCreateTableBasic() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("CREATE TABLE \"test_table\""));
+    assertTrue(sql.contains("\"id\" int4"));
+    assertTrue(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithComment() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            "This is a test table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("COMMENT ON TABLE \"test_table\" IS 'This is a 
test table'"));
+  }
+
+  @Test
+  void testCreateTableWithColumnComment() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .withComment("Primary key column")
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("COMMENT ON COLUMN \"test_table\".\"id\" IS 
'Primary key column'"));
+  }
+
+  @Test
+  void testCreateTableWithDistribution() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            dist,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("distribution_key = 'id'"));
+    assertTrue(sql.contains("WITH ("));
+  }
+
+  @Test
+  void testCreateTableWithProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("orientation", "column");
+    properties.put("time_to_live_in_seconds", "3600");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("time_to_live_in_seconds = '3600'"));
+  }
+
+  @Test
+  void testCreateTableDistributionKeyFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("distribution_key", "should_be_ignored");
+    properties.put("orientation", "column");
+    Distribution dist = Distributions.hash(0, NamedReference.field("id"));
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            dist,
+            Indexes.EMPTY_INDEXES);
+    // Should use the Distribution parameter, not the property
+    assertTrue(sql.contains("distribution_key = 'id'"));
+    assertFalse(sql.contains("should_be_ignored"));
+  }
+
+  @Test
+  void testCreateTableIsLogicalPartitionedFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("orientation", "column");
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // is_logical_partitioned_table should NOT appear in WITH clause
+    assertFalse(sql.contains("is_logical_partitioned_table"));
+    assertTrue(sql.contains("orientation = 'column'"));
+  }
+
+  @Test
+  void testCreateTablePrimaryKeyFilteredFromProperties() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("primary_key", "id");
+    properties.put("orientation", "column");
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new Index[] {pk});
+    // primary_key should NOT appear in WITH clause (it is defined via PRIMARY 
KEY constraint)
+    assertFalse(sql.contains("primary_key"));
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("PRIMARY KEY"));
+  }
+
+  @Test
+  void testCreateTableWithPhysicalPartition() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("PARTITION BY LIST(\"ds\")"));
+    assertFalse(sql.contains("LOGICAL"));
+  }
+
+  @Test
+  void testCreateTableWithLogicalPartition() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+  }
+
+  @Test
+  void testCreateTableWithLogicalPartitionTwoColumns() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("region")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    Transform[] partitioning = {Transforms.list(new String[][] {{"region"}, 
{"ds"}})};
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col1, col2},
+            null,
+            properties,
+            partitioning,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"region\", \"ds\")"));
+  }
+
+  @Test
+  void testCreateTableWithPrimaryKey() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    Index pk = Indexes.primary("pk_id", new String[][] {{"id"}});
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new Index[] {pk});
+    assertTrue(sql.contains("PRIMARY KEY (\"id\")"));
+    // Hologres does not support custom constraint names, so CONSTRAINT should 
not appear
+    assertFalse(sql.contains("CONSTRAINT"));
+  }
+
+  @Test
+  void testCreateTableWithAutoIncrementThrows() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .withAutoIncrement(true)
+            .build();
+    IllegalArgumentException ex =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                ops.createTableSql(
+                    "test_table",
+                    new JdbcColumn[] {col},
+                    null,
+                    Collections.emptyMap(),
+                    Transforms.EMPTY_TRANSFORM,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES));
+    assertTrue(ex.getMessage().contains("auto-increment"));
+  }
+
+  @Test
+  void testCreateTableWithNullableColumn() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("name")
+            .withType(Types.StringType.get())
+            .withNullable(true)
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("NULL"));
+    assertFalse(sql.contains("NOT NULL"));
+  }
+
+  @Test
+  void testCreateTableWithDefaultValue() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("status")
+            .withType(Types.IntegerType.get())
+            .withNullable(true)
+            .withDefaultValue(Literals.integerLiteral(0))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("DEFAULT 0"));
+  }
+
+  @Test
+  void testCreateTableMultipleColumns() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("name")
+            .withType(Types.StringType.get())
+            .withNullable(true)
+            .build();
+    JdbcColumn col3 =
+        JdbcColumn.builder()
+            .withName("amount")
+            .withType(Types.DecimalType.of(10, 2))
+            .withNullable(true)
+            .withDefaultValue(
+                
Literals.decimalLiteral(org.apache.gravitino.rel.types.Decimal.of("0.00", 10, 
2)))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "orders",
+            new JdbcColumn[] {col1, col2, col3},
+            "Order table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("\"id\" int8"));
+    assertTrue(sql.contains("\"name\" text"));
+    assertTrue(sql.contains("\"amount\" numeric(10,2)"));
+    assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+  }
+
+  @Test
+  void testCreateTableCommentWithSingleQuotes() {
+    JdbcColumn col =
+        JdbcColumn.builder()
+            .withName("flag")
+            .withType(Types.StringType.get())
+            .withNullable(false)
+            .withComment("退货标志('R'=已退货, 'A'=未退货)")
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_table",
+            new JdbcColumn[] {col},
+            "It's a test table",
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    // Single quotes in table comment should be escaped
+    assertTrue(sql.contains("IS 'It''s a test table'"));
+    // Single quotes in column comment should be escaped
+    assertTrue(sql.contains("IS '退货标志(''R''=已退货, ''A''=未退货)'"));
+    // Unescaped single quotes should NOT appear
+    assertFalse(sql.contains("IS 'It's"));
+  }
+
+  @Test
+  void testCreateTableFullFeatured() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("order_id")
+            .withType(Types.LongType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("orientation", "column");
+    Distribution dist = Distributions.hash(0, 
NamedReference.field("order_id"));
+    Transform[] partitioning = {Transforms.list(new String[][] {{"ds"}})};
+    Index pk = Indexes.primary("pk_order", new String[][] {{"order_id"}, 
{"ds"}});
+    String sql =
+        ops.createTableSql(
+            "orders",
+            new JdbcColumn[] {col1, col2},
+            "Order table",
+            properties,
+            partitioning,
+            dist,
+            new Index[] {pk});
+    assertTrue(sql.contains("CREATE TABLE \"orders\""));
+    assertTrue(sql.contains("LOGICAL PARTITION BY LIST(\"ds\")"));
+    assertTrue(sql.contains("distribution_key = 'order_id'"));
+    assertTrue(sql.contains("orientation = 'column'"));
+    assertTrue(sql.contains("PRIMARY KEY (\"order_id\", \"ds\")"));
+    assertTrue(sql.contains("COMMENT ON TABLE \"orders\" IS 'Order table'"));
+    assertFalse(sql.contains("is_logical_partitioned_table"));
+  }
+
+  // ==================== generateRenameTableSql tests ====================
+
+  @Test
+  void testRenameTableSql() {
+    String sql = ops.renameTableSql("old_table", "new_table");
+    assertEquals("ALTER TABLE \"old_table\" RENAME TO \"new_table\"", sql);
+  }
+
+  // ==================== generateDropTableSql tests ====================
+
+  @Test
+  void testDropTableSql() {
+    String sql = ops.dropTableSql("my_table");
+    assertEquals("DROP TABLE \"my_table\"", sql);
+  }
+
+  // ==================== generatePurgeTableSql tests ====================
+
+  @Test
+  void testPurgeTableSqlThrowsException() {
+    assertThrows(UnsupportedOperationException.class, () -> 
ops.purgeTableSql("my_table"));
+  }
+
+  // ==================== updateColumnAutoIncrement not supported 
====================
+  // Note: Hologres does not support altering column auto-increment via ALTER 
TABLE.
+  // The UpdateColumnAutoIncrement change type is rejected in 
generateAlterTableSql().
+  // CREATE TABLE with auto-increment (GENERATED BY DEFAULT AS IDENTITY) is 
also not supported
+  // and will throw an exception — see 
testCreateTableWithAutoIncrementThrows() above.
+
+  // ==================== addIndex / deleteIndex not supported 
====================
+  // Note: Hologres does not support adding or deleting indexes via ALTER 
TABLE.
+  // The AddIndex and DeleteIndex change types are rejected in 
generateAlterTableSql().
+  // CREATE TABLE with PRIMARY KEY and UNIQUE KEY constraints is still 
supported
+  // — see testCreateTableWithPrimaryKey() above.
+
+  // ==================== getIndexFieldStr tests ====================
+
+  @Test
+  void testGetIndexFieldStrSingleColumn() {
+    String result = HologresTableOperations.getIndexFieldStr(new String[][] 
{{"id"}});
+    assertEquals("\"id\"", result);
+  }
+
+  @Test
+  void testGetIndexFieldStrMultipleColumns() {
+    String result = HologresTableOperations.getIndexFieldStr(new String[][] 
{{"id"}, {"name"}});
+    assertEquals("\"id\", \"name\"", result);
+  }
+
+  @Test
+  void testGetIndexFieldStrRejectsNestedColumn() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> HologresTableOperations.getIndexFieldStr(new String[][] 
{{"schema", "column"}}));
+  }
+
+  // ==================== calculateDatetimePrecision tests ====================
+
+  @Test
+  void testCalculateDatetimePrecisionTimestamp() {
+    Integer result = ops.calculateDatetimePrecision("TIMESTAMP", 0, 6);
+    assertEquals(6, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionTimestamptz() {
+    Integer result = ops.calculateDatetimePrecision("TIMESTAMPTZ", 0, 3);
+    assertEquals(3, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionTime() {
+    Integer result = ops.calculateDatetimePrecision("TIME", 0, 0);
+    assertEquals(0, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionTimetz() {
+    Integer result = ops.calculateDatetimePrecision("TIMETZ", 0, 6);
+    assertEquals(6, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionNonDatetime() {
+    Integer result = ops.calculateDatetimePrecision("INT4", 0, 0);
+    assertEquals(null, result);
+  }
+
+  @Test
+  void testCalculateDatetimePrecisionNegativeScaleReturnsZero() {
+    Integer result = ops.calculateDatetimePrecision("TIMESTAMP", 0, -1);
+    assertEquals(0, result);
+  }
+
+  @Test
+  void testCreateTableWithGeneratedColumn() {
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("order_time")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        JdbcColumn.builder()
+            .withName("ds")
+            .withType(Types.TimestampType.withoutTimeZone())
+            .withNullable(false)
+            .withDefaultValue(UnparsedExpression.of("date_trunc('day'::text, 
order_time)"))
+            .build();
+    String sql =
+        ops.createTableSql(
+            "test_generated",
+            new JdbcColumn[] {col1, col2},
+            null,
+            Collections.emptyMap(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            Indexes.EMPTY_INDEXES);
+    assertTrue(sql.contains("GENERATED ALWAYS AS (date_trunc('day'::text, 
order_time)) STORED"));
+    assertTrue(sql.contains("NOT NULL"));
+    // Should NOT contain DEFAULT for the generated column
+    assertFalse(sql.contains("DEFAULT date_trunc"));
+  }

Review Comment:
   These tests assume that providing a column `defaultValue` as 
`UnparsedExpression` should produce a stored generated column (`GENERATED 
ALWAYS AS ... STORED`). Elsewhere in the codebase, `UnparsedExpression` is used 
to represent default *expressions* (e.g., `now()`, `random()`), so locking in 
this behavior makes it hard to support normal default expressions in Hologres. 
Consider adjusting the tests (and implementation) so `UnparsedExpression` 
remains a DEFAULT expression, and introduce a distinct mechanism for generated 
columns if needed.



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(
+        String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName, 
HOLO_QUOTE, NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s%s%s", HOLO_QUOTE, column.name(), 
HOLO_QUOTE));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)

Review Comment:
   `distribution_key` is built from `distribution.expressions()` using 
`Object::toString` and concatenation. `Distribution` can be constructed with 
non-column expressions (functions, nested refs), so this can emit invalid 
`distribution_key` values and potentially allow unsafe characters. Consider 
validating that every expression is a `NamedReference` with a single-part 
`fieldName()` and constructing the value from `fieldName()[0]` (with 
appropriate validation) rather than relying on `toString()`.
   ```suggestion
                 .map(
                     expression -> {
                       Preconditions.checkArgument(
                           expression instanceof NamedReference,
                           "Hologres distribution expressions must be simple 
column references");
                       String[] fieldNames = ((NamedReference) 
expression).fieldName();
                       Preconditions.checkArgument(
                           fieldNames != null && fieldNames.length == 1,
                           "Hologres distribution expressions must reference a 
single column");
                       String columnName = fieldNames[0];
                       Preconditions.checkArgument(
                           columnName != null && !columnName.isEmpty(),
                           "Hologres distribution column name must not be null 
or empty");
                       Preconditions.checkArgument(
                           columnName.chars()
                               .allMatch(
                                   ch ->
                                       Character.isLetterOrDigit(ch)
                                           || ch == '_'),
                           "Invalid characters in Hologres distribution column 
name: %s",
                           columnName);
                       return columnName;
                     })
   ```



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(
+        String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName, 
HOLO_QUOTE, NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s%s%s", HOLO_QUOTE, column.name(), 
HOLO_QUOTE));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              withEntries.add(String.format("%s = '%s'", key, value));
+            }
+          });
+    }
+
+    // Generate WITH clause
+    if (!withEntries.isEmpty()) {
+      sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+      sqlBuilder.append(
+          withEntries.stream()
+              .map(entry -> String.format("    %s", entry))
+              .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+      sqlBuilder.append(String.format("%s)", NEW_LINE));
+    }
+
+    sqlBuilder.append(";");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      String escapedComment = comment.replace("'", "''");
+      sqlBuilder
+          .append(NEW_LINE)
+          .append(
+              String.format(
+                  "COMMENT ON TABLE %s%s%s IS '%s';",
+                  HOLO_QUOTE, tableName, HOLO_QUOTE, escapedComment));
+    }
+    Arrays.stream(columns)
+        .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+        .forEach(
+            jdbcColumn -> {
+              String escapedColComment = jdbcColumn.comment().replace("'", 
"''");
+              sqlBuilder
+                  .append(NEW_LINE)
+                  .append(
+                      String.format(
+                          "COMMENT ON COLUMN %s%s%s.%s%s%s IS '%s';",
+                          HOLO_QUOTE,
+                          tableName,
+                          HOLO_QUOTE,
+                          HOLO_QUOTE,
+                          jdbcColumn.name(),
+                          HOLO_QUOTE,
+                          escapedColComment));
+            });
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @VisibleForTesting
+  static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(String.format(",%s", NEW_LINE));
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Hologres only supports PRIMARY_KEY index, but got: " + 
index.type());
+      }
+    }
+  }
+
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in Hologres");
+              }
+              return String.format("%s%s%s", HOLO_QUOTE, colNames[0], 
HOLO_QUOTE);
+            })
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Append the partitioning clause to the CREATE TABLE SQL.
+   *
+   * <p>Hologres supports two types of partition tables:
+   *
+   * <ul>
+   *   <li>Physical partition table: uses {@code PARTITION BY LIST(column)} 
syntax
+   *   <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY 
LIST(column1[,
+   *       column2])} syntax
+   * </ul>
+   *
+   * @param partitioning the partition transforms (only LIST partitioning is 
supported)
+   * @param isLogicalPartition whether to create a logical partition table
+   * @param sqlBuilder the SQL builder to append to
+   */
+  @VisibleForTesting
+  static void appendPartitioningSql(
+      Transform[] partitioning, boolean isLogicalPartition, StringBuilder 
sqlBuilder) {
+    Preconditions.checkArgument(
+        partitioning.length == 1,
+        "Hologres only supports single partition transform, but got %s",
+        partitioning.length);
+    Preconditions.checkArgument(
+        partitioning[0] instanceof Transforms.ListTransform,
+        "Hologres only supports LIST partitioning, but got %s",
+        partitioning[0].getClass().getSimpleName());
+
+    Transforms.ListTransform listTransform = (Transforms.ListTransform) 
partitioning[0];
+    String[][] fieldNames = listTransform.fieldNames();
+
+    Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must 
not be empty");
+
+    if (isLogicalPartition) {
+      Preconditions.checkArgument(
+          fieldNames.length <= 2,
+          "Logical partition table supports at most 2 partition columns, but 
got: %s",
+          fieldNames.length);
+    } else {
+      Preconditions.checkArgument(
+          fieldNames.length == 1,
+          "Physical partition table supports exactly 1 partition column, but 
got: %s",
+          fieldNames.length);
+    }
+
+    String partitionColumns =
+        Arrays.stream(fieldNames)
+            .map(
+                colNames -> {
+                  Preconditions.checkArgument(
+                      colNames.length == 1,
+                      "Hologres partition does not support nested field 
names");
+                  return String.format("%s%s%s", HOLO_QUOTE, colNames[0], 
HOLO_QUOTE);
+                })
+            .collect(Collectors.joining(", "));
+
+    sqlBuilder.append(NEW_LINE);
+    if (isLogicalPartition) {
+      sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)", 
partitionColumns));
+    } else {
+      sqlBuilder.append(String.format("PARTITION BY LIST(%s)", 
partitionColumns));
+    }
+  }
+
+  private void appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Hologres does not support auto-increment columns via Gravitino
+    if (column.autoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support creating auto-increment columns via 
Gravitino, column: "
+              + column.name());
+    }
+
+    // Handle generated (stored computed) columns:
+    // GENERATED ALWAYS AS (expr) STORED must come before nullable constraints.
+    if (column.defaultValue() instanceof UnparsedExpression) {
+      String expr = ((UnparsedExpression) 
column.defaultValue()).unparsedExpression();
+      sqlBuilder.append(String.format("GENERATED ALWAYS AS (%s) STORED ", 
expr));
+      if (column.nullable()) {
+        sqlBuilder.append("NULL ");
+      } else {
+        sqlBuilder.append("NOT NULL ");
+      }
+      return;
+    }
+
+    // Add NOT NULL if the column is marked as such
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+    // Add DEFAULT value if specified
+    appendDefaultValue(column, sqlBuilder);
   }
 
   @Override
-  protected String generateAlterTableSql(
-      String schemaName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "Hologres table alteration will be implemented in a follow-up PR.");
+  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
+    return String.format(
+        "%s%s%s%s RENAME TO %s%s%s",
+        ALTER_TABLE, HOLO_QUOTE, oldTableName, HOLO_QUOTE, HOLO_QUOTE, 
newTableName, HOLO_QUOTE);
+  }
+
+  @Override
+  protected String generateDropTableSql(String tableName) {
+    return String.format("DROP TABLE %s%s%s", HOLO_QUOTE, tableName, 
HOLO_QUOTE);
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
         "Hologres does not support purge table in Gravitino, please use drop 
table");
   }
+
+  @Override
+  protected String generateAlterTableSql(
+      String schemaName, String tableName, TableChange... changes) {
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    List<String> alterSql = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.add(updateCommentDefinition((TableChange.UpdateComment) 
change, lazyLoadTable));
+      } else if (change instanceof TableChange.SetProperty) {
+        throw new IllegalArgumentException("Set property is not supported 
yet");
+      } else if (change instanceof TableChange.RemoveProperty) {
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {

Review Comment:
   This `generateAlterTableSql` implementation currently throws 
`IllegalArgumentException` for `TableChange.SetProperty` and 
`TableChange.RemoveProperty` (and several other change types), but the PR 
description states ALTER TABLE supports set/remove properties and other 
operations. Either implement the missing change handling here (and add tests), 
or update the PR description to reflect the actual supported ALTER operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to