[GitHub] [hudi] voonhous commented on a diff in pull request #7761: [MINOR] Standardise schema concepts on Flink Engine

2023-01-29 Thread via GitHub


voonhous commented on code in PR #7761:
URL: https://github.com/apache/hudi/pull/7761#discussion_r1090228713


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java:
##
@@ -50,8 +50,7 @@ public static ClosableIterator 
getParquetRecordIterator(
   Path path,
   long splitStart,
   long splitLength) throws IOException {
-InternalSchema fileSchema = 
internalSchemaManager.getFileSchema(path.getName());

Review Comment:
   @trushev Thank you for helping to review this. I've reverted the changes and 
added more javadocs. Please help to take a look + review it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] voonhous commented on a diff in pull request #7761: [MINOR] Standardise schema concepts on Flink Engine

2023-01-29 Thread via GitHub


voonhous commented on code in PR #7761:
URL: https://github.com/apache/hudi/pull/7761#discussion_r1090219771


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java:
##
@@ -50,8 +50,7 @@ public static ClosableIterator 
getParquetRecordIterator(
   Path path,
   long splitStart,
   long splitLength) throws IOException {
-InternalSchema fileSchema = 
internalSchemaManager.getFileSchema(path.getName());

Review Comment:
   My bad, failed to account for this case. Let me revert this + add a few more 
comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] voonhous commented on a diff in pull request #7761: [MINOR] Standardise schema concepts on Flink Engine

2023-01-29 Thread via GitHub


voonhous commented on code in PR #7761:
URL: https://github.com/apache/hudi/pull/7761#discussion_r1090132321


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java:
##
@@ -93,60 +93,62 @@ public InternalSchema getQuerySchema() {
 return querySchema;
   }
 
-  InternalSchema getFileSchema(String fileName) {
+  InternalSchema getMergeSchema(String fileName) {
 if (querySchema.isEmptySchema()) {
   return querySchema;
 }
 long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
-InternalSchema fileSchemaUnmerged = 
InternalSchemaCache.getInternalSchemaByVersionId(
+InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(
 commitInstantTime, tablePath, getHadoopConf(), validCommits);
-if (querySchema.equals(fileSchemaUnmerged)) {
+if (querySchema.equals(fileSchema)) {
   return InternalSchema.getEmptyInternalSchema();
 }
-return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, 
true).mergeSchema();
+return new InternalSchemaMerger(fileSchema, querySchema, true, 
true).mergeSchema();
   }
 
   /**
-   * This method returns a mapping of columns that have type inconsistencies 
between the fileSchema and querySchema.
+   * This method returns a mapping of columns that have type inconsistencies 
between the mergeSchema and querySchema.
* This is done by:
* 1. Finding the columns with type changes
* 2. Get a map storing the index of these columns with type changes; 
Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)
* 3. For each selectedField with type changes, build a castMap 
containing the cast/conversion details;
* Map of -> (selectedPos, Cast([from] fileType, [to] queryType))
*
-   * @param fileSchema InternalSchema representation of the file's schema 
(acquired from commit/.schema metadata)
+   * @param mergeSchema InternalSchema representation of mergeSchema 
(prioritise use of fileSchemaType) that is used for reading base parquet files
* @param queryFieldNames array containing the columns of a Hudi Flink table
* @param queryFieldTypes array containing the field types of the columns of 
a Hudi Flink table
* @param selectedFields array containing the index of the columns of 
interest required (indexes are based on queryFieldNames and queryFieldTypes)
* @return a castMap containing the information of how to cast a 
selectedField from the fileType to queryType.
*
* @see CastMap
*/
-  CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
+  CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
 Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
-Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
+Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema 
cannot be empty");
 
 CastMap castMap = new CastMap();
 // map storing the indexes of columns with type changes Map of -> 
(colIdxInQueryFieldNames, colIdxInQuerySchema)
-Map posProxy = getPosProxy(fileSchema, queryFieldNames);
+Map posProxy = getPosProxy(mergeSchema, queryFieldNames);
 if (posProxy.isEmpty()) {
   // no type changes
   castMap.setFileFieldTypes(queryFieldTypes);
   return castMap;
 }
 List selectedFieldList = 
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
-List fileSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
-AvroInternalSchemaConverter.convert(fileSchema, 
"tableName")).getChildren();
+// mergeSchema is built with useColumnTypeFromFileSchema = true

Review Comment:
   Hmmm, I added this comment when tracing the code to remind myself WHY 
fetching `fileFieldTypes` from `mergeSchema` works.
   
   On top of that, from line 106, it is pretty clear that the code is building 
the `mergeSchema` with `useColumnTypeFromFileSchema = true`. 
   
   Moving this comment from line 138 to line 106 would change the intention 
behind the comment. 
   
   If this line is at line 106, it would be telling the readers "WHAT" the code 
is doing.
   If it is at line 138, it would be telling the readers "WHY" fetching file 
types from `mergeSchema` works.
   
   So, i don't really think moving this comment is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] voonhous commented on a diff in pull request #7761: [MINOR] Standardise schema concepts on Flink Engine

2023-01-29 Thread via GitHub


voonhous commented on code in PR #7761:
URL: https://github.com/apache/hudi/pull/7761#discussion_r1090132321


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java:
##
@@ -93,60 +93,62 @@ public InternalSchema getQuerySchema() {
 return querySchema;
   }
 
-  InternalSchema getFileSchema(String fileName) {
+  InternalSchema getMergeSchema(String fileName) {
 if (querySchema.isEmptySchema()) {
   return querySchema;
 }
 long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
-InternalSchema fileSchemaUnmerged = 
InternalSchemaCache.getInternalSchemaByVersionId(
+InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(
 commitInstantTime, tablePath, getHadoopConf(), validCommits);
-if (querySchema.equals(fileSchemaUnmerged)) {
+if (querySchema.equals(fileSchema)) {
   return InternalSchema.getEmptyInternalSchema();
 }
-return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, 
true).mergeSchema();
+return new InternalSchemaMerger(fileSchema, querySchema, true, 
true).mergeSchema();
   }
 
   /**
-   * This method returns a mapping of columns that have type inconsistencies 
between the fileSchema and querySchema.
+   * This method returns a mapping of columns that have type inconsistencies 
between the mergeSchema and querySchema.
* This is done by:
* 1. Finding the columns with type changes
* 2. Get a map storing the index of these columns with type changes; 
Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)
* 3. For each selectedField with type changes, build a castMap 
containing the cast/conversion details;
* Map of -> (selectedPos, Cast([from] fileType, [to] queryType))
*
-   * @param fileSchema InternalSchema representation of the file's schema 
(acquired from commit/.schema metadata)
+   * @param mergeSchema InternalSchema representation of mergeSchema 
(prioritise use of fileSchemaType) that is used for reading base parquet files
* @param queryFieldNames array containing the columns of a Hudi Flink table
* @param queryFieldTypes array containing the field types of the columns of 
a Hudi Flink table
* @param selectedFields array containing the index of the columns of 
interest required (indexes are based on queryFieldNames and queryFieldTypes)
* @return a castMap containing the information of how to cast a 
selectedField from the fileType to queryType.
*
* @see CastMap
*/
-  CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
+  CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
 Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
-Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
+Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema 
cannot be empty");
 
 CastMap castMap = new CastMap();
 // map storing the indexes of columns with type changes Map of -> 
(colIdxInQueryFieldNames, colIdxInQuerySchema)
-Map posProxy = getPosProxy(fileSchema, queryFieldNames);
+Map posProxy = getPosProxy(mergeSchema, queryFieldNames);
 if (posProxy.isEmpty()) {
   // no type changes
   castMap.setFileFieldTypes(queryFieldTypes);
   return castMap;
 }
 List selectedFieldList = 
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
-List fileSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
-AvroInternalSchemaConverter.convert(fileSchema, 
"tableName")).getChildren();
+// mergeSchema is built with useColumnTypeFromFileSchema = true

Review Comment:
   Hmmm, I added this comment when tracing the code to remind myself WHY 
fetching `fileFieldTypes` from `mergeSchema` works.
   
   On top of that, from line 106, it is pretty clear that the code is building 
the `mergeSchema` with `useColumnTypeFromFileSchema = true`. Moving this 
comment from line 138 to line 106 would change the intention behind the 
comment. 
   
   If this line is at line 106, it would be telling the readers "WHAT" the code 
is doing.
   If it is at line 138, it would be telling the readers "WHY" fetching file 
types from `mergeSchema` works.
   
   So, i don't really think moving this comment is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org