yihua commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1411117772


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -123,4 +125,11 @@ private Object getFieldValueFromInternalRow(InternalRow 
row, Schema recordSchema
       return null;
     }
   }
+
+  @Override
+  public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
+    UnsafeProjection projection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(AvroConversionUtils.convertAvroSchemaToStructType(from),

Review Comment:
   Use `HoodieInternalRowUtils.getCachedSchema(schema)` to avoid parsing Avro 
schema per record?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -449,7 +449,8 @@ private static void validateRow(InternalRow data, 
StructType schema) {
         data instanceof HoodieInternalRow
             || data instanceof GenericInternalRow
             || data instanceof SpecificInternalRow
-            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data))
+            || data instanceof JoinedRow;

Review Comment:
   Partition value is empty for non-partitioned tables.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -77,14 +78,18 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
           }
         }).asInstanceOf[ClosableIterator[InternalRow]]
     } else {
-      if (baseFileReader.isEmpty) {
-        throw new IllegalArgumentException("Base file reader is missing when 
instantiating "
-          + "SparkFileFormatInternalRowReaderContext.");
+      val key = generateKey(dataSchema, requiredSchema)

Review Comment:
   nit: rename to `schemaPairHashKey`



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -49,8 +51,7 @@ import scala.collection.mutable
  *                        not required for reading a file group with only log 
files.
  * @param partitionValues The values for a partition in which the file group 
lives.
  */
-class SparkFileFormatInternalRowReaderContext(baseFileReader: 
Option[PartitionedFile => Iterator[InternalRow]],
-                                              partitionValues: InternalRow) 
extends BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, 
PartitionedFile => Iterator[InternalRow]]) extends 
BaseSparkInternalRowReaderContext {

Review Comment:
   Add docs on `readerMaps`



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -218,17 +220,102 @@ private static boolean isProjectionOfInternal(Schema 
sourceSchema,
     return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
   }
 
+  public static Option<Schema.Field> findNestedField(Schema schema, String 
fieldName) {
+    return findNestedField(schema, fieldName.split("\\."), 0);
+  }
+
+  private static Option<Schema.Field> findNestedField(Schema schema, String[] 
fieldParts, int index) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      Option<Schema.Field> notUnion = 
findNestedField(resolveNullableSchema(schema), fieldParts, index);
+      if (!notUnion.isPresent()) {
+        return Option.empty();
+      }
+      Schema.Field nu = notUnion.get();
+      return Option.of(new Schema.Field(nu.name(), nu.schema(), nu.doc(), 
nu.defaultVal()));
+    }
+    if (fieldParts.length <= index) {
+      return Option.empty();
+    }
+
+    Schema.Field foundField = schema.getField(fieldParts[index]);
+    if (foundField == null) {
+      return Option.empty();
+    }
+
+    if (index == fieldParts.length - 1) {
+      return Option.of(new Schema.Field(foundField.name(), 
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+    }
+
+    Schema foundSchema = foundField.schema();
+    Option<Schema.Field> nestedPart = findNestedField(foundSchema, fieldParts, 
index + 1);
+    if (!nestedPart.isPresent()) {
+      return Option.empty();
+    }
+    //temporary, need to match HoodieFileGroupReaderBasedParquetFileFormat for 
now
+    return nestedPart;
+    /*
+    boolean isUnion = false;
+    if (foundSchema.getType().equals(Schema.Type.UNION)) {
+      isUnion = true;
+      foundSchema = resolveNullableSchema(foundSchema);
+    }
+
+    Schema newSchema = Schema.createRecord(foundSchema.getName(), 
foundSchema.getDoc(), foundSchema.getNamespace(), false, 
Collections.singletonList(nestedPart.get()));
+    return Option.of(new Schema.Field(foundField.name(), isUnion ? 
createNullableSchema(newSchema) : newSchema, foundField.doc(), 
foundField.defaultVal()));
+     */

Review Comment:
   Remove unused code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to