yihua commented on code in PR #10137: URL: https://github.com/apache/hudi/pull/10137#discussion_r1411117772
########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java: ########## @@ -123,4 +125,11 @@ private Object getFieldValueFromInternalRow(InternalRow row, Schema recordSchema return null; } } + + @Override + public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) { + UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(AvroConversionUtils.convertAvroSchemaToStructType(from), Review Comment: Use `HoodieInternalRowUtils.getCachedSchema(schema)` to avoid parsing Avro schema per record? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java: ########## @@ -449,7 +449,8 @@ private static void validateRow(InternalRow data, StructType schema) { data instanceof HoodieInternalRow || data instanceof GenericInternalRow || data instanceof SpecificInternalRow - || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data)); + || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data)) + || data instanceof JoinedRow; Review Comment: Partition value is empty for non-partitioned tables. ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala: ########## @@ -77,14 +78,18 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned } }).asInstanceOf[ClosableIterator[InternalRow]] } else { - if (baseFileReader.isEmpty) { - throw new IllegalArgumentException("Base file reader is missing when instantiating " - + "SparkFileFormatInternalRowReaderContext."); + val key = generateKey(dataSchema, requiredSchema) Review Comment: nit: rename to `schemaPairHashKey` ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala: ########## @@ -49,8 +51,7 @@ import scala.collection.mutable * not required for reading a file group with only log files. * @param partitionValues The values for a partition in which the file group lives. */ -class SparkFileFormatInternalRowReaderContext(baseFileReader: Option[PartitionedFile => Iterator[InternalRow]], - partitionValues: InternalRow) extends BaseSparkInternalRowReaderContext { +class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, PartitionedFile => Iterator[InternalRow]]) extends BaseSparkInternalRowReaderContext { Review Comment: Add docs on `readerMaps` ########## hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java: ########## @@ -218,17 +220,102 @@ private static boolean isProjectionOfInternal(Schema sourceSchema, return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema); } + public static Option<Schema.Field> findNestedField(Schema schema, String fieldName) { + return findNestedField(schema, fieldName.split("\\."), 0); + } + + private static Option<Schema.Field> findNestedField(Schema schema, String[] fieldParts, int index) { + if (schema.getType().equals(Schema.Type.UNION)) { + Option<Schema.Field> notUnion = findNestedField(resolveNullableSchema(schema), fieldParts, index); + if (!notUnion.isPresent()) { + return Option.empty(); + } + Schema.Field nu = notUnion.get(); + return Option.of(new Schema.Field(nu.name(), nu.schema(), nu.doc(), nu.defaultVal())); + } + if (fieldParts.length <= index) { + return Option.empty(); + } + + Schema.Field foundField = schema.getField(fieldParts[index]); + if (foundField == null) { + return Option.empty(); + } + + if (index == fieldParts.length - 1) { + return Option.of(new Schema.Field(foundField.name(), foundField.schema(), foundField.doc(), foundField.defaultVal())); + } + + Schema foundSchema = foundField.schema(); + Option<Schema.Field> nestedPart = findNestedField(foundSchema, fieldParts, index + 1); + if (!nestedPart.isPresent()) { + return Option.empty(); + } + //temporary, need to match HoodieFileGroupReaderBasedParquetFileFormat for now + return nestedPart; + /* + boolean isUnion = false; + if (foundSchema.getType().equals(Schema.Type.UNION)) { + isUnion = true; + foundSchema = resolveNullableSchema(foundSchema); + } + + Schema newSchema = Schema.createRecord(foundSchema.getName(), foundSchema.getDoc(), foundSchema.getNamespace(), false, Collections.singletonList(nestedPart.get())); + return Option.of(new Schema.Field(foundField.name(), isUnion ? createNullableSchema(newSchema) : newSchema, foundField.doc(), foundField.defaultVal())); + */ Review Comment: Remove unused code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org