Fokko commented on code in PR #9366: URL: https://github.com/apache/iceberg/pull/9366#discussion_r1436375930
########## core/src/main/java/org/apache/iceberg/avro/AvroIterable.java: ########## @@ -78,7 +79,8 @@ public CloseableIterator<D> iterator() { if (start != null) { if (reader instanceof SupportsRowPosition) { ((SupportsRowPosition) reader) - .setRowPositionSupplier(() -> AvroIO.findStartingRowPos(file::newStream, start)); + .setRowPositionSupplier( + Suppliers.memoize(() -> AvroIO.findStartingRowPos(file::newStream, start))); Review Comment: Why the `memoize`? Are we reading the same file multiple times? ########## core/src/main/java/org/apache/iceberg/avro/Avro.java: ########## @@ -683,23 +698,34 @@ public ReadBuilder classLoader(ClassLoader classLoader) { return this; } + @SuppressWarnings("unchecked") public <D> AvroIterable<D> build() { Preconditions.checkNotNull(schema, "Schema is required"); - Function<Schema, DatumReader<?>> readerFunc; + + if (null == nameMapping) { + this.nameMapping = MappingUtil.create(schema); + } + + DatumReader<D> reader; Review Comment: I always like to make these final so you're sure that it doesn't skip through a branch. ```suggestion final DatumReader<D> reader; ``` ########## core/src/main/java/org/apache/iceberg/avro/ValueReaders.java: ########## @@ -381,6 +550,45 @@ public BigDecimal read(Decoder decoder, Object ignored) throws IOException { byte[] bytes = bytesReader.read(decoder, null); return new BigDecimal(new BigInteger(bytes), scale); } + + @Override + public void skip(Decoder decoder) throws IOException { + bytesReader.skip(decoder); + } + } + + private static class RequiredOptionReader implements ValueReader<Object> { Review Comment: Why do we need this next to the UnionReader? ########## core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java: ########## @@ -69,62 +99,107 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) { @Override public T read(T reuse, Decoder decoder) throws IOException { - return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse); + return reader.read(decoder, reuse); } - private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> { - private final ClassLoader loader; + private class ResolvingReadBuilder extends AvroWithPartnerVisitor<Type, ValueReader<?>> { + private final Map<Type, Schema> avroSchemas; - private ReadBuilder(ClassLoader loader) { - this.loader = loader; + private ResolvingReadBuilder(Types.StructType expectedType, String rootName) { + this.avroSchemas = AvroSchemaUtil.convertTypes(expectedType, rootName); } @Override - @SuppressWarnings("unchecked") - public ValueReader<?> record(Schema record, List<String> names, List<ValueReader<?>> fields) { - try { - Class<?> recordClass = - DynClasses.builder().loader(loader).impl(record.getFullName()).buildChecked(); - if (IndexedRecord.class.isAssignableFrom(recordClass)) { - return ValueReaders.record(fields, (Class<? extends IndexedRecord>) recordClass, record); + public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> fieldResults) { + Types.StructType expected = partner != null ? partner.asStructType() : null; + Map<Integer, Integer> idToPos = idToPos(expected); + + List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList(); + List<Schema.Field> fileFields = record.getFields(); + for (int pos = 0; pos < fileFields.size(); pos += 1) { + Schema.Field field = fileFields.get(pos); + ValueReader<?> fieldReader = fieldResults.get(pos); + Integer fieldId = AvroSchemaUtil.fieldId(field); + Integer projectionPos = idToPos.remove(fieldId); + + Object constant = idToConstant.get(fieldId); + if (projectionPos != null && constant != null) { + readPlan.add(Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant))); + } else { + readPlan.add(Pair.of(projectionPos, fieldReader)); + } + } + + // handle any expected columns that are not in the data file + for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) { + int fieldId = idAndPos.getKey(); + int pos = idAndPos.getValue(); + + Object constant = idToConstant.get(fieldId); + Types.NestedField field = expected.field(fieldId); + if (constant != null) { + readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); + } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { Review Comment: Do we need to codify these cases? They should just follow the Iceberg spec like any other Avro file. ########## core/src/main/java/org/apache/iceberg/avro/ValueReader.java: ########## @@ -23,4 +23,8 @@ public interface ValueReader<T> { T read(Decoder decoder, Object reuse) throws IOException; + + default void skip(Decoder decoder) throws IOException { Review Comment: This is nice, since this allows us to efficiently skip over the metadata when using the new block-encoder that will be part of the new Avro release: https://github.com/apache/iceberg/blob/6c344dbbbe0fc1e373f588a1bbc4c64029cc7f8c/core/src/main/java/org/apache/iceberg/avro/AvroIO.java#L166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org