This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dd3059a254 NIFI-15568 Fix Timestamp Partitioning in PutIcebergRecord 
(#10996)
2dd3059a254 is described below

commit 2dd3059a254c919f4dd6f51e30c6c41e1068e602
Author: David Handermann <[email protected]>
AuthorDate: Thu Mar 12 13:16:31 2026 -0500

    NIFI-15568 Fix Timestamp Partitioning in PutIcebergRecord (#10996)
    
    * NIFI-15568 Fixed Timestamp Partitioning in PutIcebergRecord
    - Added conditional conversion of partition key fields for 
ParquetIcebergWriter
    - Added conditional conversion of java.sql types to java.time types for 
PutIcebergRecord
---
 .../parquet/io/ParquetPartitionedWriter.java       |   8 +-
 .../iceberg/parquet/io/PartitionKeyRecord.java     | 121 +++++++++++++++++
 .../iceberg/parquet/ParquetIcebergWriterTest.java  |  92 +++++++++++--
 .../iceberg/parquet/io/PartitionKeyRecordTest.java | 147 +++++++++++++++++++++
 .../processors/iceberg/record/DelegatedRecord.java |   7 +-
 .../processors/iceberg/record/RecordConverter.java | 101 ++++++++++++++
 .../iceberg/record/DelegatedRecordTest.java        | 138 +++++++++++++++++++
 7 files changed, 600 insertions(+), 14 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
index e5bc9a18657..4f1a3786abc 100644
--- 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.types.Types;
 
 /**
  * Parquet implementation of Partition Writer with Partition Key derived from 
configured Schema definition
@@ -32,6 +33,7 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 public class ParquetPartitionedWriter extends PartitionedFanoutWriter<Record> {
 
     private final PartitionKey partitionKey;
+    private final PartitionKeyRecord partitionKeyRecord;
 
     public ParquetPartitionedWriter(
             final PartitionSpec spec,
@@ -43,11 +45,15 @@ public class ParquetPartitionedWriter extends 
PartitionedFanoutWriter<Record> {
     ) {
         super(spec, FileFormat.PARQUET, appenderFactory, fileFactory, io, 
targetFileSize);
         this.partitionKey = new PartitionKey(spec, schema);
+        final Types.StructType struct = schema.asStruct();
+        this.partitionKeyRecord = new PartitionKeyRecord(struct);
     }
 
     @Override
     protected PartitionKey partition(final Record record) {
-        partitionKey.partition(record);
+        // Partition Key Record handles conversion of selected Field Types
+        partitionKeyRecord.wrap(record);
+        partitionKey.partition(partitionKeyRecord);
         return partitionKey;
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecord.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecord.java
new file mode 100644
index 00000000000..1bde17efb0a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecord.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Partition Key Record Wrapper based on Apache Iceberg InternalRecordWrapper 
to avoid iceberg-data dependency
+ */
+class PartitionKeyRecord implements StructLike {
+    private final Function<Object, Object>[] converters;
+
+    private StructLike wrapped = null;
+
+    @SuppressWarnings("unchecked")
+    PartitionKeyRecord(final Types.StructType structType) {
+        final List<Types.NestedField> fields = structType.fields();
+
+        converters = fields.stream()
+                .map(Types.NestedField::type)
+                .map(PartitionKeyRecord::getConverter)
+                .toArray(length -> (Function<Object, Object>[]) 
Array.newInstance(Function.class, length));
+    }
+
+    @Override
+    public int size() {
+        return wrapped.size();
+    }
+
+    @Override
+    public <T> T get(final int position, final Class<T> javaClass) {
+        final T processed;
+
+        final Function<Object, Object> converter = converters[position];
+        if (converter == null) {
+            processed = wrapped.get(position, javaClass);
+        } else {
+            final Object value = wrapped.get(position, Object.class);
+            if (value == null) {
+                processed = null;
+            } else {
+                final Object converted = converter.apply(value);
+                processed = javaClass.cast(converted);
+            }
+        }
+
+        return processed;
+    }
+
+    @Override
+    public <T> void set(final int position, final T value) {
+        throw new UnsupportedOperationException("Set method not supported");
+    }
+
+    PartitionKeyRecord wrap(final StructLike wrapped) {
+        this.wrapped = wrapped;
+        return this;
+    }
+
+    private static Function<Object, Object> getConverter(final Type fieldType) 
{
+        final Type.TypeID typeId = fieldType.typeId();
+
+        final Function<Object, Object> converter;
+
+        if (Type.TypeID.TIMESTAMP_NANO == typeId) {
+            final Types.TimestampNanoType timestampNanoType = 
(Types.TimestampNanoType) fieldType;
+            if (timestampNanoType.shouldAdjustToUTC()) {
+                converter = dateTime -> 
DateTimeUtil.nanosFromTimestamptz((OffsetDateTime) dateTime);
+            } else {
+                converter = dateTime -> 
DateTimeUtil.nanosFromTimestamp((LocalDateTime) dateTime);
+            }
+        } else if (Type.TypeID.TIMESTAMP == typeId) {
+            final Types.TimestampType timestampType = (Types.TimestampType) 
fieldType;
+            if (timestampType.shouldAdjustToUTC()) {
+                converter = dateTime -> 
DateTimeUtil.microsFromTimestamptz((OffsetDateTime) dateTime);
+            } else {
+                converter = dateTime -> 
DateTimeUtil.microsFromTimestamp((LocalDateTime) dateTime);
+            }
+        } else if (Type.TypeID.DATE == typeId) {
+            converter = date -> DateTimeUtil.daysFromDate((LocalDate) date);
+        } else if (Type.TypeID.TIME == typeId) {
+            converter = time -> DateTimeUtil.microsFromTime((LocalTime) time);
+        } else if (Type.TypeID.FIXED == typeId) {
+            converter = bytes -> ByteBuffer.wrap((byte[]) bytes);
+        } else if (Type.TypeID.STRUCT == typeId) {
+            final Types.StructType fieldStructType = fieldType.asStructType();
+            final PartitionKeyRecord partitionKeyRecord = new 
PartitionKeyRecord(fieldStructType);
+            converter = struct -> partitionKeyRecord.wrap((StructLike) struct);
+        } else {
+            converter = null;
+        }
+
+        return converter;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
index e06ea7138e9..636b0ba091f 100644
--- 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
@@ -18,8 +18,10 @@ package org.apache.nifi.services.iceberg.parquet;
 
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
@@ -29,6 +31,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.services.iceberg.IcebergRowWriter;
 import org.apache.nifi.util.NoOpProcessor;
@@ -41,11 +44,16 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -60,6 +68,14 @@ class ParquetIcebergWriterTest {
 
     private static final String FIRST_FIELD_VALUE = "value";
 
+    private static final int CREATED_FIELD_ID = 1;
+
+    private static final String CREATED_FIELD_NAME = "created";
+
+    private static final LocalDateTime CREATED_FIELD_VALUE = LocalDateTime.of(
+            LocalDate.ofEpochDay(1), LocalTime.ofSecondOfDay(0)
+    );
+
     private ParquetIcebergWriter parquetIcebergWriter;
 
     private TestRunner runner;
@@ -67,9 +83,6 @@ class ParquetIcebergWriterTest {
     @Mock
     private Table table;
 
-    @Mock
-    private PartitionSpec spec;
-
     @Mock
     private FileIO io;
 
@@ -103,8 +116,9 @@ class ParquetIcebergWriterTest {
 
         final Schema schema = getSchema();
         final InMemoryOutputFile outputFile = new InMemoryOutputFile();
-        setTable(schema, outputFile);
-        when(spec.isUnpartitioned()).thenReturn(true);
+        final PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+        setTable(schema, partitionSpec, outputFile);
+        
when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
 
         final IcebergRowWriter rowWriter = 
parquetIcebergWriter.getRowWriter(table);
 
@@ -117,16 +131,76 @@ class ParquetIcebergWriterTest {
 
         final Schema schema = getSchema();
         final InMemoryOutputFile outputFile = new InMemoryOutputFile();
-        setTable(schema, outputFile);
-        when(spec.isUnpartitioned()).thenReturn(true);
+        final PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+        setTable(schema, partitionSpec, outputFile);
+        
when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
+
+        final IcebergRowWriter rowWriter = 
parquetIcebergWriter.getRowWriter(table);
+        writeRow(schema, rowWriter);
+
+        final DataFile[] dataFiles = rowWriter.dataFiles();
+        final byte[] serialized = outputFile.toByteArray();
+        assertDataFilesFound(dataFiles, serialized);
+    }
+
+    @Test
+    void testWriteDataFilesPartitioned() throws IOException {
+        runner.enableControllerService(parquetIcebergWriter);
+
+        final Schema schema = getSchema();
+        final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+        final PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema).identity(FIRST_FIELD_NAME).build();
+        setTable(schema, partitionSpec, outputFile);
+        when(locationProvider.newDataLocation(eq(partitionSpec), 
isA(StructLike.class), anyString())).thenReturn(LOCATION);
 
         final IcebergRowWriter rowWriter = 
parquetIcebergWriter.getRowWriter(table);
+        writeRow(schema, rowWriter);
 
+        final DataFile[] dataFiles = rowWriter.dataFiles();
+        final byte[] serialized = outputFile.toByteArray();
+        assertDataFilesFound(dataFiles, serialized);
+    }
+
+    @Test
+    void testWriteDataFilesPartitionedTimestamp() throws IOException {
+        runner.enableControllerService(parquetIcebergWriter);
+
+        final Types.NestedField firstNestedField = 
Types.NestedField.required(FIRST_FIELD_ID, FIRST_FIELD_NAME, 
Types.StringType.get());
+        final Types.NestedField createdNestedField = 
Types.NestedField.required(CREATED_FIELD_ID, CREATED_FIELD_NAME, 
Types.TimestampType.withoutZone());
+        final Schema schema = new Schema(firstNestedField, createdNestedField);
+        final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+        final PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema).identity(CREATED_FIELD_NAME).build();
+
+        setTable(schema, partitionSpec, outputFile);
+        when(locationProvider.newDataLocation(eq(partitionSpec), 
isA(StructLike.class), anyString())).thenReturn(LOCATION);
+
+        final IcebergRowWriter rowWriter = 
parquetIcebergWriter.getRowWriter(table);
         final GenericRecord row = GenericRecord.create(schema);
         row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE);
+        row.setField(CREATED_FIELD_NAME, CREATED_FIELD_VALUE);
         rowWriter.write(row);
 
         final DataFile[] dataFiles = rowWriter.dataFiles();
+        final byte[] serialized = outputFile.toByteArray();
+        assertDataFilesFound(dataFiles, serialized);
+
+        final DataFile dataFile = dataFiles[0];
+        final StructLike partition = dataFile.partition();
+        assertInstanceOf(PartitionData.class, partition);
+        final PartitionData partitionData = (PartitionData) partition;
+        final Object partitionField = partitionData.get(0);
+
+        final long microsecondsExpected = 
DateTimeUtil.microsFromTimestamp(CREATED_FIELD_VALUE);
+        assertEquals(microsecondsExpected, partitionField);
+    }
+
+    private void writeRow(final Schema schema, final IcebergRowWriter 
rowWriter) throws IOException {
+        final GenericRecord row = GenericRecord.create(schema);
+        row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE);
+        rowWriter.write(row);
+    }
+
+    private void assertDataFilesFound(final DataFile[] dataFiles, final byte[] 
serialized) {
         assertNotNull(dataFiles);
         assertEquals(1, dataFiles.length);
 
@@ -135,7 +209,6 @@ class ParquetIcebergWriterTest {
         assertEquals(FileFormat.PARQUET, dataFile.format());
         assertEquals(1, dataFile.recordCount());
 
-        final byte[] serialized = outputFile.toByteArray();
         assertEquals(serialized.length, dataFile.fileSizeInBytes());
     }
 
@@ -144,13 +217,12 @@ class ParquetIcebergWriterTest {
         return new Schema(nestedField);
     }
 
-    private void setTable(final Schema schema, final OutputFile outputFile) {
+    private void setTable(final Schema schema, final PartitionSpec spec, final 
OutputFile outputFile) {
         when(table.schema()).thenReturn(schema);
         when(table.spec()).thenReturn(spec);
         when(table.io()).thenReturn(io);
         when(table.locationProvider()).thenReturn(locationProvider);
         when(table.encryption()).thenReturn(encryptionManager);
-        
when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
         when(io.newOutputFile(eq(LOCATION))).thenReturn(outputFile);
         
when(encryptionManager.encrypt(eq(outputFile))).thenReturn(encryptedOutputFile);
         
when(encryptedOutputFile.encryptingOutputFile()).thenReturn(outputFile);
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecordTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecordTest.java
new file mode 100644
index 00000000000..a01dd71562a
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecordTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class PartitionKeyRecordTest {
+
+    private static final LocalDateTime CREATED = 
LocalDateTime.of(LocalDate.ofEpochDay(0), LocalTime.ofSecondOfDay(0));
+    private static final OffsetDateTime CREATED_ZONED = 
OffsetDateTime.of(CREATED, ZoneOffset.UTC);
+    private static final LocalDate UPDATED = LocalDate.ofEpochDay(0);
+    private static final LocalTime STOPPED = LocalTime.ofSecondOfDay(300);
+
+    private static final String FIRST_FIELD_NAME = "firstField";
+
+    private static final int FIRST_FIELD_POSITION = 0;
+
+    @Test
+    void testTimestampNanoTypeConverted() {
+        final Record record = 
setFirstField(Types.TimestampNanoType.withoutZone(), CREATED);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final long expected = DateTimeUtil.nanosFromTimestamp(CREATED);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testTimestampNanoTypeZonedConverted() {
+        final Record record = 
setFirstField(Types.TimestampNanoType.withZone(), CREATED_ZONED);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final long expected = DateTimeUtil.nanosFromTimestamptz(CREATED_ZONED);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testTimestampTypeConverted() {
+        final Record record = setFirstField(Types.TimestampType.withoutZone(), 
CREATED);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final long expected = DateTimeUtil.microsFromTimestamp(CREATED);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testTimestampTypeZonedConverted() {
+        final Record record = setFirstField(Types.TimestampType.withZone(), 
CREATED_ZONED);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final long expected = 
DateTimeUtil.microsFromTimestamptz(CREATED_ZONED);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testDateTypeConverted() {
+        final Record record = setFirstField(Types.DateType.get(), UPDATED);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final int expected = DateTimeUtil.daysFromDate(UPDATED);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testTimeTypeConverted() {
+        final Record record = setFirstField(Types.TimeType.get(), STOPPED);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final long expected = DateTimeUtil.microsFromTime(STOPPED);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testFixedTypeConverted() {
+        final byte[] bytes = 
String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);
+
+        final Record record = 
setFirstField(Types.FixedType.ofLength(bytes.length), bytes);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        final ByteBuffer expected = ByteBuffer.wrap(bytes);
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    @Test
+    void testStringTypeUnchanged() {
+        final String expected = String.class.getName();
+
+        final Record record = setFirstField(Types.StringType.get(), expected);
+        final Object wrappedFirstField = getWrappedFirstField(record);
+
+        assertEquals(expected, wrappedFirstField);
+    }
+
+    private Object getWrappedFirstField(final Record record) {
+        final PartitionKeyRecord partitionKeyRecord = new 
PartitionKeyRecord(record.struct());
+        partitionKeyRecord.wrap(record);
+        return partitionKeyRecord.get(FIRST_FIELD_POSITION, Object.class);
+    }
+
+    private Record setFirstField(final Type firstFieldType, final Object 
firstField) {
+        final Schema schema = getSchema(firstFieldType);
+        final Record record = GenericRecord.create(schema);
+        record.set(FIRST_FIELD_POSITION, firstField);
+        return record;
+    }
+
+    private Schema getSchema(final Type firstFieldType) {
+        final Types.NestedField field = Types.NestedField.builder()
+                .ofType(firstFieldType)
+                .asRequired()
+                .withId(FIRST_FIELD_POSITION)
+                .withName(FIRST_FIELD_NAME)
+                .build();
+        final List<Types.NestedField> fields = List.of(field);
+        return new Schema(fields);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
index c9ab79c4287..3267cc32214 100644
--- 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
@@ -22,6 +22,7 @@ import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.RecordField;
 
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -37,7 +38,7 @@ public class DelegatedRecord implements Record {
             final org.apache.nifi.serialization.record.Record record,
             final Types.StructType struct
     ) {
-        this.record = Objects.requireNonNull(record);
+        this.record = 
RecordConverter.getConvertedRecord(Objects.requireNonNull(record));
         this.struct = Objects.requireNonNull(struct);
     }
 
@@ -91,14 +92,14 @@ public class DelegatedRecord implements Record {
     }
 
     /**
-     * Create and return a copy of the Record
+     * Create and return a copy of the Record with new Map of fields and values
      *
      * @param overrides Fields and values to override in the copied Record
      * @return Copy of the Record
      */
     @Override
     public Record copy(final Map<String, Object> overrides) {
-        final Map<String, Object> values = record.toMap();
+        final Map<String, Object> values = new LinkedHashMap<>(record.toMap());
         values.putAll(overrides);
         final MapRecord mapRecord = new MapRecord(record.getSchema(), values);
         return new DelegatedRecord(mapRecord, struct);
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
new file mode 100644
index 00000000000..b84159d4af0
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.record;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Record Converter handles translating field values to types compatible with 
Apache Iceberg Records
+ */
+class RecordConverter {
+
+    private static final Set<RecordFieldType> CONVERSION_REQUIRED_FIELD_TYPES 
= Set.of(
+            RecordFieldType.TIMESTAMP,
+            RecordFieldType.DATE,
+            RecordFieldType.TIME
+    );
+
+    /**
+     * Get Converted Record with conditional handling for field values 
requiring translation
+     *
+     * @param inputRecord Input Record to be converted
+     * @return Input Record or new Record with converted field values
+     */
+    static Record getConvertedRecord(final Record inputRecord) {
+        final Record convertedRecord;
+
+        final RecordSchema recordSchema = inputRecord.getSchema();
+        if (isConversionRequired(recordSchema)) {
+            final Map<String, Object> values = inputRecord.toMap();
+            convertedRecord = getConvertedRecord(recordSchema, values);
+        } else {
+            convertedRecord = inputRecord;
+        }
+
+        return convertedRecord;
+    }
+
+    private static Record getConvertedRecord(final RecordSchema recordSchema, 
final Map<String, Object> values) {
+        final Map<String, Object> convertedValues = new LinkedHashMap<>();
+
+        for (final Map.Entry<String, Object> entry : values.entrySet()) {
+            final String field = entry.getKey();
+            final Object value = entry.getValue();
+            final Object converted = getConvertedValue(value);
+            convertedValues.put(field, converted);
+        }
+
+        return new MapRecord(recordSchema, convertedValues);
+    }
+
+    private static Object getConvertedValue(final Object value) {
+        return switch (value) {
+            // Convert java.sql types to corresponding java.time types for 
Apache Iceberg
+            case Timestamp timestamp -> timestamp.toLocalDateTime();
+            case Date date -> date.toLocalDate();
+            case Time time -> time.toLocalTime();
+            case null, default -> value;
+        };
+    }
+
+    private static boolean isConversionRequired(final RecordSchema 
recordSchema) {
+        final List<RecordField> fields = recordSchema.getFields();
+
+        for (final RecordField field : fields) {
+            final DataType dataType = field.getDataType();
+            final RecordFieldType recordFieldType = dataType.getFieldType();
+            if (CONVERSION_REQUIRED_FIELD_TYPES.contains(recordFieldType)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java
 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java
new file mode 100644
index 00000000000..7e43a5c4448
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.record;
+
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DelegatedRecordTest {
+
+    private static final String LABEL_FIELD = "label";
+    private static final String LABEL = "delegated-record";
+    private static final String LABEL_MODIFIED = "modified-record";
+
+    private static final String CREATED_FIELD = "created";
+    private static final String UPDATED_FIELD = "updated";
+    private static final String STOPPED_FIELD = "stopped";
+    private static final Timestamp CREATED = Timestamp.valueOf("2026-01-01 
12:30:45.123456789");
+    private static final LocalDateTime CREATED_CONVERTED = 
CREATED.toLocalDateTime();
+    private static final Date UPDATED = Date.valueOf("2026-02-03");
+    private static final LocalDate UPDATED_CONVERTED = UPDATED.toLocalDate();
+    private static final Time STOPPED = Time.valueOf("23:30:45");
+    private static final LocalTime STOPPED_CONVERTED = STOPPED.toLocalTime();
+
+    @Test
+    void testCopyEmptyRecord() {
+        final List<RecordField> recordFields = List.of();
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+        final Record record = new MapRecord(recordSchema, new 
LinkedHashMap<>());
+
+        final Types.StructType structType = Types.StructType.of();
+        final DelegatedRecord delegatedRecord = new DelegatedRecord(record, 
structType);
+
+        assertEquals(recordSchema.getFieldCount(), delegatedRecord.size());
+
+        final org.apache.iceberg.data.Record copiedRecord = 
delegatedRecord.copy();
+        assertEquals(delegatedRecord, copiedRecord);
+
+        assertEquals(delegatedRecord.hashCode(), record.hashCode());
+    }
+
+    @Test
+    void testSetGetStringField() {
+        final RecordSchema recordSchema = new SimpleRecordSchema(
+                List.of(
+                        new RecordField(LABEL_FIELD, 
RecordFieldType.STRING.getDataType())
+                )
+        );
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put(LABEL_FIELD, LABEL);
+
+        final Record record = new MapRecord(recordSchema, values);
+
+        final Types.StructType structType = Types.StructType.of();
+        final DelegatedRecord delegatedRecord = new DelegatedRecord(record, 
structType);
+
+        final Types.StructType recordStruct = delegatedRecord.struct();
+        assertEquals(structType, recordStruct);
+        assertEquals(recordSchema.getFieldCount(), delegatedRecord.size());
+
+        final Object field = delegatedRecord.getField(LABEL_FIELD);
+        assertEquals(LABEL, field);
+
+        final Object firstField = delegatedRecord.get(0);
+        assertEquals(LABEL, firstField);
+
+        final String firstStringField = delegatedRecord.get(0, String.class);
+        assertEquals(LABEL, firstStringField);
+
+        delegatedRecord.setField(LABEL_FIELD, LABEL_MODIFIED);
+        final Object fieldModified = delegatedRecord.getField(LABEL_FIELD);
+        assertEquals(LABEL_MODIFIED, fieldModified);
+
+        delegatedRecord.set(0, LABEL);
+        final Object fieldReverted = delegatedRecord.getField(LABEL_FIELD);
+        assertEquals(LABEL, fieldReverted);
+    }
+
+    @Test
+    void testGetTimestampDateTimeFields() {
+        final RecordSchema recordSchema = new SimpleRecordSchema(
+                List.of(
+                        new RecordField(CREATED_FIELD, 
RecordFieldType.TIMESTAMP.getDataType()),
+                        new RecordField(UPDATED_FIELD, 
RecordFieldType.DATE.getDataType()),
+                        new RecordField(STOPPED_FIELD, 
RecordFieldType.TIME.getDataType())
+                )
+        );
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put(CREATED_FIELD, CREATED);
+        values.put(UPDATED_FIELD, UPDATED);
+        values.put(STOPPED_FIELD, STOPPED);
+
+        final Record record = new MapRecord(recordSchema, values);
+
+        final Types.StructType structType = Types.StructType.of();
+        final DelegatedRecord delegatedRecord = new DelegatedRecord(record, 
structType);
+
+        final Object created = delegatedRecord.getField(CREATED_FIELD);
+        assertEquals(CREATED_CONVERTED, created);
+
+        final Object updated = delegatedRecord.getField(UPDATED_FIELD);
+        assertEquals(UPDATED_CONVERTED, updated);
+
+        final Object stopped = delegatedRecord.getField(STOPPED_FIELD);
+        assertEquals(STOPPED_CONVERTED, stopped);
+    }
+}


Reply via email to