TimothyDing commented on code in PR #10068:
URL: https://github.com/apache/gravitino/pull/10068#discussion_r2890395548


##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +193,875 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(
+        String.format("CREATE TABLE %s%s%s (%s", HOLO_QUOTE, tableName, 
HOLO_QUOTE, NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s%s%s", HOLO_QUOTE, column.name(), 
HOLO_QUOTE));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(Object::toString)
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              withEntries.add(String.format("%s = '%s'", key, value));
+            }
+          });
+    }
+
+    // Generate WITH clause
+    if (!withEntries.isEmpty()) {
+      sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+      sqlBuilder.append(
+          withEntries.stream()
+              .map(entry -> String.format("    %s", entry))
+              .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+      sqlBuilder.append(String.format("%s)", NEW_LINE));
+    }
+
+    sqlBuilder.append(";");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      String escapedComment = comment.replace("'", "''");
+      sqlBuilder
+          .append(NEW_LINE)
+          .append(
+              String.format(
+                  "COMMENT ON TABLE %s%s%s IS '%s';",
+                  HOLO_QUOTE, tableName, HOLO_QUOTE, escapedComment));
+    }
+    Arrays.stream(columns)
+        .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+        .forEach(
+            jdbcColumn -> {
+              String escapedColComment = jdbcColumn.comment().replace("'", 
"''");
+              sqlBuilder
+                  .append(NEW_LINE)
+                  .append(
+                      String.format(
+                          "COMMENT ON COLUMN %s%s%s.%s%s%s IS '%s';",
+                          HOLO_QUOTE,
+                          tableName,
+                          HOLO_QUOTE,
+                          HOLO_QUOTE,
+                          jdbcColumn.name(),
+                          HOLO_QUOTE,
+                          escapedColComment));
+            });
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @VisibleForTesting
+  static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(String.format(",%s", NEW_LINE));
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Hologres only supports PRIMARY_KEY index, but got: " + 
index.type());
+      }
+    }
+  }
+
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in Hologres");
+              }
+              return String.format("%s%s%s", HOLO_QUOTE, colNames[0], 
HOLO_QUOTE);
+            })
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Append the partitioning clause to the CREATE TABLE SQL.
+   *
+   * <p>Hologres supports two types of partition tables:
+   *
+   * <ul>
+   *   <li>Physical partition table: uses {@code PARTITION BY LIST(column)} 
syntax
+   *   <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY 
LIST(column1[,
+   *       column2])} syntax
+   * </ul>
+   *
+   * @param partitioning the partition transforms (only LIST partitioning is 
supported)
+   * @param isLogicalPartition whether to create a logical partition table
+   * @param sqlBuilder the SQL builder to append to
+   */
+  @VisibleForTesting
+  static void appendPartitioningSql(
+      Transform[] partitioning, boolean isLogicalPartition, StringBuilder 
sqlBuilder) {
+    Preconditions.checkArgument(
+        partitioning.length == 1,
+        "Hologres only supports single partition transform, but got %s",
+        partitioning.length);
+    Preconditions.checkArgument(
+        partitioning[0] instanceof Transforms.ListTransform,
+        "Hologres only supports LIST partitioning, but got %s",
+        partitioning[0].getClass().getSimpleName());
+
+    Transforms.ListTransform listTransform = (Transforms.ListTransform) 
partitioning[0];
+    String[][] fieldNames = listTransform.fieldNames();
+
+    Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must 
not be empty");
+
+    if (isLogicalPartition) {
+      Preconditions.checkArgument(
+          fieldNames.length <= 2,
+          "Logical partition table supports at most 2 partition columns, but 
got: %s",
+          fieldNames.length);
+    } else {
+      Preconditions.checkArgument(
+          fieldNames.length == 1,
+          "Physical partition table supports exactly 1 partition column, but 
got: %s",
+          fieldNames.length);
+    }
+
+    String partitionColumns =
+        Arrays.stream(fieldNames)
+            .map(
+                colNames -> {
+                  Preconditions.checkArgument(
+                      colNames.length == 1,
+                      "Hologres partition does not support nested field 
names");
+                  return String.format("%s%s%s", HOLO_QUOTE, colNames[0], 
HOLO_QUOTE);
+                })
+            .collect(Collectors.joining(", "));
+
+    sqlBuilder.append(NEW_LINE);
+    if (isLogicalPartition) {
+      sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)", 
partitionColumns));
+    } else {
+      sqlBuilder.append(String.format("PARTITION BY LIST(%s)", 
partitionColumns));
+    }
+  }
+
+  private void appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Hologres does not support auto-increment columns via Gravitino
+    if (column.autoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support creating auto-increment columns via 
Gravitino, column: "
+              + column.name());
+    }
+
+    // Handle generated (stored computed) columns:
+    // GENERATED ALWAYS AS (expr) STORED must come before nullable constraints.
+    if (column.defaultValue() instanceof UnparsedExpression) {
+      String expr = ((UnparsedExpression) 
column.defaultValue()).unparsedExpression();
+      sqlBuilder.append(String.format("GENERATED ALWAYS AS (%s) STORED ", 
expr));
+      if (column.nullable()) {
+        sqlBuilder.append("NULL ");
+      } else {
+        sqlBuilder.append("NOT NULL ");
+      }
+      return;
+    }
+
+    // Add NOT NULL if the column is marked as such
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+    // Add DEFAULT value if specified

Review Comment:
   fix in a5b7ea03ae306fb94427dfdaeca1f98b5bfae46d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to