mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1342539976


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -433,11 +494,11 @@ private static Record setupChoiceTestRecord() {
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest

Review Comment:
   Please don't remove `@ParameterizedTest` in this test class since it is 
testing different writers for different format types.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java:
##########
@@ -267,6 +278,7 @@ public void doOnTrigger(ProcessContext context, 
ProcessSession session, FlowFile
 
             final WriteResult result = taskWriter.complete();
             appendDataFiles(context, flowFile, table, result);
+            taskWriter.close();

Review Comment:
   This is not needed. The `complete()` method closes the task writer.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -156,8 +174,25 @@ public DataType fieldPartner(DataType dataType, int 
fieldId, String name) {
             final RecordTypeWithFieldNameMapper recordType = 
(RecordTypeWithFieldNameMapper) dataType;
 
             final Optional<String> mappedFieldName = 
recordType.getNameMapping(name);
-            Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot 
find field with name '%s' in the record schema", name));
-
+            if 
(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) 
{
+                Validate.isTrue(mappedFieldName.isPresent(), 
String.format("Cannot find field with name '%s' in the record schema", name));
+            }
+            if (mappedFieldName.isEmpty()) {
+                if 
(UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior))
 {
+                    if (logger != null) {

Review Comment:
   If the constructor with null value logger will be removed, this check can be 
removed too.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -46,19 +48,29 @@
 public class IcebergRecordConverter {
 
     private final DataConverter<Record, GenericRecord> converter;
+    public final UnmatchedColumnBehavior unmatchedColumnBehavior;
+    public ComponentLog logger;
+
     public GenericRecord convert(Record record) {
         return converter.convert(record);
     }
 
-    @SuppressWarnings("unchecked")
     public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, 
FileFormat fileFormat) {
-        this.converter = (DataConverter<Record, GenericRecord>) 
IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), 
fileFormat);
+        this(schema, recordSchema, fileFormat, 
UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, null);

Review Comment:
   As I can see this constructor was added only for unit tests to have default 
null value for logger. I think this should be removed and add a logger to the 
unit test that can be passed as parameter to the other constructor.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -265,9 +267,14 @@ static class RecordConverter extends DataConverter<Record, 
GenericRecord> {
 
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<RecordField> recordField = 
recordSchema.getField(converter.getSourceFieldName());
-                final RecordField field = recordField.get();
-                // creates a record field accessor for every data converter
-                getters.put(converter.getTargetFieldName(), 
createFieldGetter(field.getDataType(), field.getFieldName(), 
field.isNullable()));
+                if (recordField.isEmpty()) {
+                    Types.NestedField missingField = 
schema.field(converter.getTargetFieldName());

Review Comment:
   This can be final.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -290,4 +297,54 @@ private <S, T> T convert(Record record, DataConverter<S, 
T> converter) {
             return converter.convert((S) 
getters.get(converter.getTargetFieldName()).getFieldOrNull(record));
         }
     }
+
+    public static DataType convertSchemaTypeToDataType(Type schemaType) {

Review Comment:
   I think it would be better to throw an exception in case of unmatching 
schema type than returning with a null value.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -504,9 +609,80 @@ public void testPrimitives(FileFormat format) throws 
IOException {
     @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCompatiblePrimitives(FileFormat format) throws IOException 
{
+    public void testPrimitivesMissingRequiredFields(FileFormat format) {
+        RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
+        Record record = setupPrimitivesTestRecordMissingFields();

Review Comment:
   This field is unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to