Fokko commented on code in PR #9366:
URL: https://github.com/apache/iceberg/pull/9366#discussion_r1436375930


##########
core/src/main/java/org/apache/iceberg/avro/AvroIterable.java:
##########
@@ -78,7 +79,8 @@ public CloseableIterator<D> iterator() {
     if (start != null) {
       if (reader instanceof SupportsRowPosition) {
         ((SupportsRowPosition) reader)
-            .setRowPositionSupplier(() -> 
AvroIO.findStartingRowPos(file::newStream, start));
+            .setRowPositionSupplier(
+                Suppliers.memoize(() -> 
AvroIO.findStartingRowPos(file::newStream, start)));

Review Comment:
   Why the `memoize`? Are we reading the same file multiple times?



##########
core/src/main/java/org/apache/iceberg/avro/Avro.java:
##########
@@ -683,23 +698,34 @@ public ReadBuilder classLoader(ClassLoader classLoader) {
       return this;
     }
 
+    @SuppressWarnings("unchecked")
     public <D> AvroIterable<D> build() {
       Preconditions.checkNotNull(schema, "Schema is required");
-      Function<Schema, DatumReader<?>> readerFunc;
+
+      if (null == nameMapping) {
+        this.nameMapping = MappingUtil.create(schema);
+      }
+
+      DatumReader<D> reader;

Review Comment:
   I always like to make these final so you're sure that it doesn't skip 
through a branch.
   ```suggestion
         final DatumReader<D> reader;
   ```



##########
core/src/main/java/org/apache/iceberg/avro/ValueReaders.java:
##########
@@ -381,6 +550,45 @@ public BigDecimal read(Decoder decoder, Object ignored) 
throws IOException {
       byte[] bytes = bytesReader.read(decoder, null);
       return new BigDecimal(new BigInteger(bytes), scale);
     }
+
+    @Override
+    public void skip(Decoder decoder) throws IOException {
+      bytesReader.skip(decoder);
+    }
+  }
+
+  private static class RequiredOptionReader implements ValueReader<Object> {

Review Comment:
   Why do we need this next to the UnionReader?



##########
core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java:
##########
@@ -69,62 +99,107 @@ public void setRowPositionSupplier(Supplier<Long> 
posSupplier) {
 
   @Override
   public T read(T reuse, Decoder decoder) throws IOException {
-    return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, 
reader, reuse);
+    return reader.read(decoder, reuse);
   }
 
-  private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
-    private final ClassLoader loader;
+  private class ResolvingReadBuilder extends AvroWithPartnerVisitor<Type, 
ValueReader<?>> {
+    private final Map<Type, Schema> avroSchemas;
 
-    private ReadBuilder(ClassLoader loader) {
-      this.loader = loader;
+    private ResolvingReadBuilder(Types.StructType expectedType, String 
rootName) {
+      this.avroSchemas = AvroSchemaUtil.convertTypes(expectedType, rootName);
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public ValueReader<?> record(Schema record, List<String> names, 
List<ValueReader<?>> fields) {
-      try {
-        Class<?> recordClass =
-            
DynClasses.builder().loader(loader).impl(record.getFullName()).buildChecked();
-        if (IndexedRecord.class.isAssignableFrom(recordClass)) {
-          return ValueReaders.record(fields, (Class<? extends IndexedRecord>) 
recordClass, record);
+    public ValueReader<?> record(Type partner, Schema record, 
List<ValueReader<?>> fieldResults) {
+      Types.StructType expected = partner != null ? partner.asStructType() : 
null;
+      Map<Integer, Integer> idToPos = idToPos(expected);
+
+      List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
+      List<Schema.Field> fileFields = record.getFields();
+      for (int pos = 0; pos < fileFields.size(); pos += 1) {
+        Schema.Field field = fileFields.get(pos);
+        ValueReader<?> fieldReader = fieldResults.get(pos);
+        Integer fieldId = AvroSchemaUtil.fieldId(field);
+        Integer projectionPos = idToPos.remove(fieldId);
+
+        Object constant = idToConstant.get(fieldId);
+        if (projectionPos != null && constant != null) {
+          readPlan.add(Pair.of(projectionPos, 
ValueReaders.replaceWithConstant(fieldReader, constant)));
+        } else {
+          readPlan.add(Pair.of(projectionPos, fieldReader));
+        }
+      }
+
+      // handle any expected columns that are not in the data file
+      for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
+        int fieldId = idAndPos.getKey();
+        int pos = idAndPos.getValue();
+
+        Object constant = idToConstant.get(fieldId);
+        Types.NestedField field = expected.field(fieldId);
+        if (constant != null) {
+          readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
+        } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {

Review Comment:
   Do we need to codify these cases? They should just follow the Iceberg spec 
like any other Avro file.



##########
core/src/main/java/org/apache/iceberg/avro/ValueReader.java:
##########
@@ -23,4 +23,8 @@
 
 public interface ValueReader<T> {
   T read(Decoder decoder, Object reuse) throws IOException;
+
+  default void skip(Decoder decoder) throws IOException {

Review Comment:
   This is nice, since this allows us to efficiently skip over the metadata 
when using the new block-encoder that will be part of the new Avro release: 
https://github.com/apache/iceberg/blob/6c344dbbbe0fc1e373f588a1bbc4c64029cc7f8c/core/src/main/java/org/apache/iceberg/avro/AvroIO.java#L166



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to