mark-bathori commented on code in PR #7421: URL: https://github.com/apache/nifi/pull/7421#discussion_r1342539976
########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java: ########## @@ -433,11 +494,11 @@ private static Record setupChoiceTestRecord() { } @DisabledOnOs(WINDOWS) - @ParameterizedTest Review Comment: Please don't remove `@ParameterizedTest` in this test class since it is testing different writers for different format types. ########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java: ########## @@ -267,6 +278,7 @@ public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile final WriteResult result = taskWriter.complete(); appendDataFiles(context, flowFile, table, result); + taskWriter.close(); Review Comment: This is not needed. The `complete()` method closes the task writer. ########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java: ########## @@ -156,8 +174,25 @@ public DataType fieldPartner(DataType dataType, int fieldId, String name) { final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType; final Optional<String> mappedFieldName = recordType.getNameMapping(name); - Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name)); - + if (UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) { + Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name)); + } + if (mappedFieldName.isEmpty()) { + if (UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) { + if (logger != null) { Review Comment: If the constructor with null value logger will be removed, this check can be removed too. ########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java: ########## @@ -46,19 +48,29 @@ public class IcebergRecordConverter { private final DataConverter<Record, GenericRecord> converter; + public final UnmatchedColumnBehavior unmatchedColumnBehavior; + public ComponentLog logger; + public GenericRecord convert(Record record) { return converter.convert(record); } - @SuppressWarnings("unchecked") public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) { - this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat); + this(schema, recordSchema, fileFormat, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, null); Review Comment: As I can see this constructor was added only for unit tests to have default null value for logger. I think this should be removed and add a logger to the unit test that can be passed as parameter to the other constructor. ########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java: ########## @@ -265,9 +267,14 @@ static class RecordConverter extends DataConverter<Record, GenericRecord> { for (DataConverter<?, ?> converter : converters) { final Optional<RecordField> recordField = recordSchema.getField(converter.getSourceFieldName()); - final RecordField field = recordField.get(); - // creates a record field accessor for every data converter - getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable())); + if (recordField.isEmpty()) { + Types.NestedField missingField = schema.field(converter.getTargetFieldName()); Review Comment: This can be final. ########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java: ########## @@ -290,4 +297,54 @@ private <S, T> T convert(Record record, DataConverter<S, T> converter) { return converter.convert((S) getters.get(converter.getTargetFieldName()).getFieldOrNull(record)); } } + + public static DataType convertSchemaTypeToDataType(Type schemaType) { Review Comment: I think it would be better to throw an exception in case of unmatching schema type than returning with a null value. ########## nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java: ########## @@ -504,9 +609,80 @@ public void testPrimitives(FileFormat format) throws IOException { @DisabledOnOs(WINDOWS) @ParameterizedTest @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) - public void testCompatiblePrimitives(FileFormat format) throws IOException { + public void testPrimitivesMissingRequiredFields(FileFormat format) { + RecordSchema nifiSchema = getPrimitivesSchemaMissingFields(); + Record record = setupPrimitivesTestRecordMissingFields(); Review Comment: This field is unused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org