[GitHub] [hudi] danny0405 commented on a diff in pull request #8054: [HUDI-5854]Support flink table column comment

2023-07-09 Thread via GitHub


danny0405 commented on code in PR #8054:
URL: https://github.com/apache/hudi/pull/8054#discussion_r1257693948


##
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java:
##
@@ -34,13 +35,21 @@
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 
 public class SparkDataSourceTableUtils {
+
+  public static Map getSparkTableProperties(List 
partitionNames, String sparkVersion,
+int 
schemaLengthThreshold, MessageType schema) {
+return getSparkTableProperties(partitionNames, sparkVersion, 
schemaLengthThreshold, schema, Collections.emptyMap());
+  }
+
   /**
* Get Spark Sql related table properties. This is used for spark datasource 
table.
-   * @param schema  The schema to write to the table.
+   *
+   * @param schema The schema to write to the table.
* @return A new parameters added the spark's table properties.
*/
   public static Map getSparkTableProperties(List 
partitionNames, String sparkVersion,
-int 
schemaLengthThreshold, MessageType schema) {
+int 
schemaLengthThreshold, MessageType schema,
+Map commentMap) {

Review Comment:
   Where does the new method be used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8054: [HUDI-5854]Support flink table column comment

2023-07-09 Thread via GitHub


danny0405 commented on code in PR #8054:
URL: https://github.com/apache/hudi/pull/8054#discussion_r1257693688


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java:
##
@@ -176,45 +177,6 @@ private static DataType 
toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
 }
   }
 
-  /**
-   * Create Hive field schemas from Flink table schema including the hoodie 
metadata fields.
-   */
-  public static List toHiveFieldSchema(TableSchema schema, 
boolean withOperationField) {
-List columns = new ArrayList<>();

Review Comment:
   Can we modify directly on these methods instead of removing and adding new 
one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8054: [HUDI-5854]Support flink table column comment

2023-02-27 Thread via GitHub


danny0405 commented on code in PR #8054:
URL: https://github.com/apache/hudi/pull/8054#discussion_r1118459723


##
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java:
##
@@ -34,25 +38,43 @@
 public class Parquet2SparkSchemaUtils {
 
   public static String convertToSparkSchemaJson(GroupType parquetSchema) {
+return convertToSparkSchemaJson(parquetSchema, Collections.emptyMap());
+  }
+
+  public static String convertToSparkSchemaJson(GroupType parquetSchema, 
Map commentMap) {
 String fieldsJsonString = parquetSchema.getFields().stream().map(field -> {
   switch (field.getRepetition()) {
 case OPTIONAL:
   return "{\"name\":\"" + field.getName() + "\",\"type\":" + 
convertFieldType(field)
-  + ",\"nullable\":true,\"metadata\":{}}";
+  + ",\"nullable\":true,\"metadata\":{" + 
getCommentStr(commentMap.get(field.getName())) + "}}";
 case REQUIRED:
   return "{\"name\":\"" + field.getName() + "\",\"type\":" + 
convertFieldType(field)
-  + ",\"nullable\":false,\"metadata\":{}}";
+  + ",\"nullable\":false,\"metadata\":{" + 
getCommentStr(commentMap.get(field.getName())) + "}}";
 case REPEATED:
   String arrayType = arrayType(field, false);
   return "{\"name\":\"" + field.getName() + "\",\"type\":" + arrayType
-  + ",\"nullable\":false,\"metadata\":{}}";
+  + ",\"nullable\":false,\"metadata\":{" + 
getCommentStr(commentMap.get(field.getName())) + "}}";
 default:
   throw new UnsupportedOperationException("Unsupport convert " + field 
+ " to spark sql type");
   }
 }).reduce((a, b) -> a + "," + b).orElse("");
 return "{\"type\":\"struct\",\"fields\":[" + fieldsJsonString + "]}";
   }
 
+  /**
+   * get comment
+   *
+   * @param comment
+   * @return
+   */
+  private static String getCommentStr(String comment) {
+String result = "";

Review Comment:
   What are these spark changes used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8054: [HUDI-5854]Support flink table column comment

2023-02-27 Thread via GitHub


danny0405 commented on code in PR #8054:
URL: https://github.com/apache/hudi/pull/8054#discussion_r1118454738


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java:
##
@@ -251,4 +213,44 @@ public static Pair, List> 
splitSchemaByPartitionK
 }
 return Pair.of(regularColumns, partitionColumns);
   }
+
+  /**
+   * Create Hive field schemas from Flink table schema including the hoodie 
metadata fields.
+   */
+  public static List toHiveFieldSchemaWithResolved(ResolvedSchema 
schema, boolean withOperationField) {
+List columns = new ArrayList<>();
+Collection metaFields = new 
ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+if (withOperationField) {
+  metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD);
+}
+
+for (String metaField : metaFields) {
+  columns.add(new FieldSchema(metaField, "string", null));
+}
+columns.addAll(createHiveColumnsWithResolved(schema));
+return columns;
+  }
+
+  /**
+   * Create Hive columns from Flink table schema.
+   */
+  private static List 
createHiveColumnsWithResolved(ResolvedSchema schema) {
+final DataType dataType = schema.toPhysicalRowDataType();
+List sourceColumns = schema.getColumns();
+final RowType rowType = (RowType) dataType.getLogicalType();
+final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+final DataType[] fieldTypes = dataType.getChildren().toArray(new 
DataType[0]);
+
+List columns = new ArrayList<>(fieldNames.length);
+
+for (int i = 0; i < fieldNames.length; i++) {
+  columns.add(

Review Comment:
   Why just move these methods around?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org