This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2dd3059a254 NIFI-15568 Fix Timestamp Partitioning in PutIcebergRecord
(#10996)
2dd3059a254 is described below
commit 2dd3059a254c919f4dd6f51e30c6c41e1068e602
Author: David Handermann <[email protected]>
AuthorDate: Thu Mar 12 13:16:31 2026 -0500
NIFI-15568 Fix Timestamp Partitioning in PutIcebergRecord (#10996)
* NIFI-15568 Fixed Timestamp Partitioning in PutIcebergRecord
- Added conditional conversion of partition key fields for
ParquetIcebergWriter
- Added conditional conversion of java.sql types to java.time types for
PutIcebergRecord
---
.../parquet/io/ParquetPartitionedWriter.java | 8 +-
.../iceberg/parquet/io/PartitionKeyRecord.java | 121 +++++++++++++++++
.../iceberg/parquet/ParquetIcebergWriterTest.java | 92 +++++++++++--
.../iceberg/parquet/io/PartitionKeyRecordTest.java | 147 +++++++++++++++++++++
.../processors/iceberg/record/DelegatedRecord.java | 7 +-
.../processors/iceberg/record/RecordConverter.java | 101 ++++++++++++++
.../iceberg/record/DelegatedRecordTest.java | 138 +++++++++++++++++++
7 files changed, 600 insertions(+), 14 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
index e5bc9a18657..4f1a3786abc 100644
---
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/ParquetPartitionedWriter.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.types.Types;
/**
* Parquet implementation of Partition Writer with Partition Key derived from
configured Schema definition
@@ -32,6 +33,7 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
public class ParquetPartitionedWriter extends PartitionedFanoutWriter<Record> {
private final PartitionKey partitionKey;
+ private final PartitionKeyRecord partitionKeyRecord;
public ParquetPartitionedWriter(
final PartitionSpec spec,
@@ -43,11 +45,15 @@ public class ParquetPartitionedWriter extends
PartitionedFanoutWriter<Record> {
) {
super(spec, FileFormat.PARQUET, appenderFactory, fileFactory, io,
targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
+ final Types.StructType struct = schema.asStruct();
+ this.partitionKeyRecord = new PartitionKeyRecord(struct);
}
@Override
protected PartitionKey partition(final Record record) {
- partitionKey.partition(record);
+ // Partition Key Record handles conversion of selected Field Types
+ partitionKeyRecord.wrap(record);
+ partitionKey.partition(partitionKeyRecord);
return partitionKey;
}
}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecord.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecord.java
new file mode 100644
index 00000000000..1bde17efb0a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/main/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecord.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Partition Key Record Wrapper based on Apache Iceberg InternalRecordWrapper
to avoid iceberg-data dependency
+ */
+class PartitionKeyRecord implements StructLike {
+ private final Function<Object, Object>[] converters;
+
+ private StructLike wrapped = null;
+
+ @SuppressWarnings("unchecked")
+ PartitionKeyRecord(final Types.StructType structType) {
+ final List<Types.NestedField> fields = structType.fields();
+
+ converters = fields.stream()
+ .map(Types.NestedField::type)
+ .map(PartitionKeyRecord::getConverter)
+ .toArray(length -> (Function<Object, Object>[])
Array.newInstance(Function.class, length));
+ }
+
+ @Override
+ public int size() {
+ return wrapped.size();
+ }
+
+ @Override
+ public <T> T get(final int position, final Class<T> javaClass) {
+ final T processed;
+
+ final Function<Object, Object> converter = converters[position];
+ if (converter == null) {
+ processed = wrapped.get(position, javaClass);
+ } else {
+ final Object value = wrapped.get(position, Object.class);
+ if (value == null) {
+ processed = null;
+ } else {
+ final Object converted = converter.apply(value);
+ processed = javaClass.cast(converted);
+ }
+ }
+
+ return processed;
+ }
+
+ @Override
+ public <T> void set(final int position, final T value) {
+ throw new UnsupportedOperationException("Set method not supported");
+ }
+
+ PartitionKeyRecord wrap(final StructLike wrapped) {
+ this.wrapped = wrapped;
+ return this;
+ }
+
+ private static Function<Object, Object> getConverter(final Type fieldType)
{
+ final Type.TypeID typeId = fieldType.typeId();
+
+ final Function<Object, Object> converter;
+
+ if (Type.TypeID.TIMESTAMP_NANO == typeId) {
+ final Types.TimestampNanoType timestampNanoType =
(Types.TimestampNanoType) fieldType;
+ if (timestampNanoType.shouldAdjustToUTC()) {
+ converter = dateTime ->
DateTimeUtil.nanosFromTimestamptz((OffsetDateTime) dateTime);
+ } else {
+ converter = dateTime ->
DateTimeUtil.nanosFromTimestamp((LocalDateTime) dateTime);
+ }
+ } else if (Type.TypeID.TIMESTAMP == typeId) {
+ final Types.TimestampType timestampType = (Types.TimestampType)
fieldType;
+ if (timestampType.shouldAdjustToUTC()) {
+ converter = dateTime ->
DateTimeUtil.microsFromTimestamptz((OffsetDateTime) dateTime);
+ } else {
+ converter = dateTime ->
DateTimeUtil.microsFromTimestamp((LocalDateTime) dateTime);
+ }
+ } else if (Type.TypeID.DATE == typeId) {
+ converter = date -> DateTimeUtil.daysFromDate((LocalDate) date);
+ } else if (Type.TypeID.TIME == typeId) {
+ converter = time -> DateTimeUtil.microsFromTime((LocalTime) time);
+ } else if (Type.TypeID.FIXED == typeId) {
+ converter = bytes -> ByteBuffer.wrap((byte[]) bytes);
+ } else if (Type.TypeID.STRUCT == typeId) {
+ final Types.StructType fieldStructType = fieldType.asStructType();
+ final PartitionKeyRecord partitionKeyRecord = new
PartitionKeyRecord(fieldStructType);
+ converter = struct -> partitionKeyRecord.wrap((StructLike) struct);
+ } else {
+ converter = null;
+ }
+
+ return converter;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
index e06ea7138e9..636b0ba091f 100644
---
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/ParquetIcebergWriterTest.java
@@ -18,8 +18,10 @@ package org.apache.nifi.services.iceberg.parquet;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.encryption.EncryptedOutputFile;
@@ -29,6 +31,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.iceberg.IcebergRowWriter;
import org.apache.nifi.util.NoOpProcessor;
@@ -41,11 +44,16 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -60,6 +68,14 @@ class ParquetIcebergWriterTest {
private static final String FIRST_FIELD_VALUE = "value";
+ private static final int CREATED_FIELD_ID = 1;
+
+ private static final String CREATED_FIELD_NAME = "created";
+
+ private static final LocalDateTime CREATED_FIELD_VALUE = LocalDateTime.of(
+ LocalDate.ofEpochDay(1), LocalTime.ofSecondOfDay(0)
+ );
+
private ParquetIcebergWriter parquetIcebergWriter;
private TestRunner runner;
@@ -67,9 +83,6 @@ class ParquetIcebergWriterTest {
@Mock
private Table table;
- @Mock
- private PartitionSpec spec;
-
@Mock
private FileIO io;
@@ -103,8 +116,9 @@ class ParquetIcebergWriterTest {
final Schema schema = getSchema();
final InMemoryOutputFile outputFile = new InMemoryOutputFile();
- setTable(schema, outputFile);
- when(spec.isUnpartitioned()).thenReturn(true);
+ final PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ setTable(schema, partitionSpec, outputFile);
+
when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
final IcebergRowWriter rowWriter =
parquetIcebergWriter.getRowWriter(table);
@@ -117,16 +131,76 @@ class ParquetIcebergWriterTest {
final Schema schema = getSchema();
final InMemoryOutputFile outputFile = new InMemoryOutputFile();
- setTable(schema, outputFile);
- when(spec.isUnpartitioned()).thenReturn(true);
+ final PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ setTable(schema, partitionSpec, outputFile);
+
when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
+
+ final IcebergRowWriter rowWriter =
parquetIcebergWriter.getRowWriter(table);
+ writeRow(schema, rowWriter);
+
+ final DataFile[] dataFiles = rowWriter.dataFiles();
+ final byte[] serialized = outputFile.toByteArray();
+ assertDataFilesFound(dataFiles, serialized);
+ }
+
+ @Test
+ void testWriteDataFilesPartitioned() throws IOException {
+ runner.enableControllerService(parquetIcebergWriter);
+
+ final Schema schema = getSchema();
+ final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+ final PartitionSpec partitionSpec =
PartitionSpec.builderFor(schema).identity(FIRST_FIELD_NAME).build();
+ setTable(schema, partitionSpec, outputFile);
+ when(locationProvider.newDataLocation(eq(partitionSpec),
isA(StructLike.class), anyString())).thenReturn(LOCATION);
final IcebergRowWriter rowWriter =
parquetIcebergWriter.getRowWriter(table);
+ writeRow(schema, rowWriter);
+ final DataFile[] dataFiles = rowWriter.dataFiles();
+ final byte[] serialized = outputFile.toByteArray();
+ assertDataFilesFound(dataFiles, serialized);
+ }
+
+ @Test
+ void testWriteDataFilesPartitionedTimestamp() throws IOException {
+ runner.enableControllerService(parquetIcebergWriter);
+
+ final Types.NestedField firstNestedField =
Types.NestedField.required(FIRST_FIELD_ID, FIRST_FIELD_NAME,
Types.StringType.get());
+ final Types.NestedField createdNestedField =
Types.NestedField.required(CREATED_FIELD_ID, CREATED_FIELD_NAME,
Types.TimestampType.withoutZone());
+ final Schema schema = new Schema(firstNestedField, createdNestedField);
+ final InMemoryOutputFile outputFile = new InMemoryOutputFile();
+ final PartitionSpec partitionSpec =
PartitionSpec.builderFor(schema).identity(CREATED_FIELD_NAME).build();
+
+ setTable(schema, partitionSpec, outputFile);
+ when(locationProvider.newDataLocation(eq(partitionSpec),
isA(StructLike.class), anyString())).thenReturn(LOCATION);
+
+ final IcebergRowWriter rowWriter =
parquetIcebergWriter.getRowWriter(table);
final GenericRecord row = GenericRecord.create(schema);
row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE);
+ row.setField(CREATED_FIELD_NAME, CREATED_FIELD_VALUE);
rowWriter.write(row);
final DataFile[] dataFiles = rowWriter.dataFiles();
+ final byte[] serialized = outputFile.toByteArray();
+ assertDataFilesFound(dataFiles, serialized);
+
+ final DataFile dataFile = dataFiles[0];
+ final StructLike partition = dataFile.partition();
+ assertInstanceOf(PartitionData.class, partition);
+ final PartitionData partitionData = (PartitionData) partition;
+ final Object partitionField = partitionData.get(0);
+
+ final long microsecondsExpected =
DateTimeUtil.microsFromTimestamp(CREATED_FIELD_VALUE);
+ assertEquals(microsecondsExpected, partitionField);
+ }
+
+ private void writeRow(final Schema schema, final IcebergRowWriter
rowWriter) throws IOException {
+ final GenericRecord row = GenericRecord.create(schema);
+ row.setField(FIRST_FIELD_NAME, FIRST_FIELD_VALUE);
+ rowWriter.write(row);
+ }
+
+ private void assertDataFilesFound(final DataFile[] dataFiles, final byte[]
serialized) {
assertNotNull(dataFiles);
assertEquals(1, dataFiles.length);
@@ -135,7 +209,6 @@ class ParquetIcebergWriterTest {
assertEquals(FileFormat.PARQUET, dataFile.format());
assertEquals(1, dataFile.recordCount());
- final byte[] serialized = outputFile.toByteArray();
assertEquals(serialized.length, dataFile.fileSizeInBytes());
}
@@ -144,13 +217,12 @@ class ParquetIcebergWriterTest {
return new Schema(nestedField);
}
- private void setTable(final Schema schema, final OutputFile outputFile) {
+ private void setTable(final Schema schema, final PartitionSpec spec, final
OutputFile outputFile) {
when(table.schema()).thenReturn(schema);
when(table.spec()).thenReturn(spec);
when(table.io()).thenReturn(io);
when(table.locationProvider()).thenReturn(locationProvider);
when(table.encryption()).thenReturn(encryptionManager);
-
when(locationProvider.newDataLocation(anyString())).thenReturn(LOCATION);
when(io.newOutputFile(eq(LOCATION))).thenReturn(outputFile);
when(encryptionManager.encrypt(eq(outputFile))).thenReturn(encryptedOutputFile);
when(encryptedOutputFile.encryptingOutputFile()).thenReturn(outputFile);
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecordTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecordTest.java
new file mode 100644
index 00000000000..a01dd71562a
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-parquet-writer/src/test/java/org/apache/nifi/services/iceberg/parquet/io/PartitionKeyRecordTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.iceberg.parquet.io;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class PartitionKeyRecordTest {
+
+ private static final LocalDateTime CREATED =
LocalDateTime.of(LocalDate.ofEpochDay(0), LocalTime.ofSecondOfDay(0));
+ private static final OffsetDateTime CREATED_ZONED =
OffsetDateTime.of(CREATED, ZoneOffset.UTC);
+ private static final LocalDate UPDATED = LocalDate.ofEpochDay(0);
+ private static final LocalTime STOPPED = LocalTime.ofSecondOfDay(300);
+
+ private static final String FIRST_FIELD_NAME = "firstField";
+
+ private static final int FIRST_FIELD_POSITION = 0;
+
+ @Test
+ void testTimestampNanoTypeConverted() {
+ final Record record =
setFirstField(Types.TimestampNanoType.withoutZone(), CREATED);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final long expected = DateTimeUtil.nanosFromTimestamp(CREATED);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testTimestampNanoTypeZonedConverted() {
+ final Record record =
setFirstField(Types.TimestampNanoType.withZone(), CREATED_ZONED);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final long expected = DateTimeUtil.nanosFromTimestamptz(CREATED_ZONED);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testTimestampTypeConverted() {
+ final Record record = setFirstField(Types.TimestampType.withoutZone(),
CREATED);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final long expected = DateTimeUtil.microsFromTimestamp(CREATED);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testTimestampTypeZonedConverted() {
+ final Record record = setFirstField(Types.TimestampType.withZone(),
CREATED_ZONED);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final long expected =
DateTimeUtil.microsFromTimestamptz(CREATED_ZONED);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testDateTypeConverted() {
+ final Record record = setFirstField(Types.DateType.get(), UPDATED);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final int expected = DateTimeUtil.daysFromDate(UPDATED);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testTimeTypeConverted() {
+ final Record record = setFirstField(Types.TimeType.get(), STOPPED);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final long expected = DateTimeUtil.microsFromTime(STOPPED);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testFixedTypeConverted() {
+ final byte[] bytes =
String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);
+
+ final Record record =
setFirstField(Types.FixedType.ofLength(bytes.length), bytes);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ final ByteBuffer expected = ByteBuffer.wrap(bytes);
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ @Test
+ void testStringTypeUnchanged() {
+ final String expected = String.class.getName();
+
+ final Record record = setFirstField(Types.StringType.get(), expected);
+ final Object wrappedFirstField = getWrappedFirstField(record);
+
+ assertEquals(expected, wrappedFirstField);
+ }
+
+ private Object getWrappedFirstField(final Record record) {
+ final PartitionKeyRecord partitionKeyRecord = new
PartitionKeyRecord(record.struct());
+ partitionKeyRecord.wrap(record);
+ return partitionKeyRecord.get(FIRST_FIELD_POSITION, Object.class);
+ }
+
+ private Record setFirstField(final Type firstFieldType, final Object
firstField) {
+ final Schema schema = getSchema(firstFieldType);
+ final Record record = GenericRecord.create(schema);
+ record.set(FIRST_FIELD_POSITION, firstField);
+ return record;
+ }
+
+ private Schema getSchema(final Type firstFieldType) {
+ final Types.NestedField field = Types.NestedField.builder()
+ .ofType(firstFieldType)
+ .asRequired()
+ .withId(FIRST_FIELD_POSITION)
+ .withName(FIRST_FIELD_NAME)
+ .build();
+ final List<Types.NestedField> fields = List.of(field);
+ return new Schema(fields);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
index c9ab79c4287..3267cc32214 100644
---
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/DelegatedRecord.java
@@ -22,6 +22,7 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
@@ -37,7 +38,7 @@ public class DelegatedRecord implements Record {
final org.apache.nifi.serialization.record.Record record,
final Types.StructType struct
) {
- this.record = Objects.requireNonNull(record);
+ this.record =
RecordConverter.getConvertedRecord(Objects.requireNonNull(record));
this.struct = Objects.requireNonNull(struct);
}
@@ -91,14 +92,14 @@ public class DelegatedRecord implements Record {
}
/**
- * Create and return a copy of the Record
+ * Create and return a copy of the Record with new Map of fields and values
*
* @param overrides Fields and values to override in the copied Record
* @return Copy of the Record
*/
@Override
public Record copy(final Map<String, Object> overrides) {
- final Map<String, Object> values = record.toMap();
+ final Map<String, Object> values = new LinkedHashMap<>(record.toMap());
values.putAll(overrides);
final MapRecord mapRecord = new MapRecord(record.getSchema(), values);
return new DelegatedRecord(mapRecord, struct);
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
new file mode 100644
index 00000000000..b84159d4af0
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/record/RecordConverter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.record;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Record Converter handles translating field values to types compatible with
Apache Iceberg Records
+ */
+class RecordConverter {
+
+ private static final Set<RecordFieldType> CONVERSION_REQUIRED_FIELD_TYPES
= Set.of(
+ RecordFieldType.TIMESTAMP,
+ RecordFieldType.DATE,
+ RecordFieldType.TIME
+ );
+
+ /**
+ * Get Converted Record with conditional handling for field values
requiring translation
+ *
+ * @param inputRecord Input Record to be converted
+ * @return Input Record or new Record with converted field values
+ */
+ static Record getConvertedRecord(final Record inputRecord) {
+ final Record convertedRecord;
+
+ final RecordSchema recordSchema = inputRecord.getSchema();
+ if (isConversionRequired(recordSchema)) {
+ final Map<String, Object> values = inputRecord.toMap();
+ convertedRecord = getConvertedRecord(recordSchema, values);
+ } else {
+ convertedRecord = inputRecord;
+ }
+
+ return convertedRecord;
+ }
+
+ private static Record getConvertedRecord(final RecordSchema recordSchema,
final Map<String, Object> values) {
+ final Map<String, Object> convertedValues = new LinkedHashMap<>();
+
+ for (final Map.Entry<String, Object> entry : values.entrySet()) {
+ final String field = entry.getKey();
+ final Object value = entry.getValue();
+ final Object converted = getConvertedValue(value);
+ convertedValues.put(field, converted);
+ }
+
+ return new MapRecord(recordSchema, convertedValues);
+ }
+
+ private static Object getConvertedValue(final Object value) {
+ return switch (value) {
+ // Convert java.sql types to corresponding java.time types for
Apache Iceberg
+ case Timestamp timestamp -> timestamp.toLocalDateTime();
+ case Date date -> date.toLocalDate();
+ case Time time -> time.toLocalTime();
+ case null, default -> value;
+ };
+ }
+
+ private static boolean isConversionRequired(final RecordSchema
recordSchema) {
+ final List<RecordField> fields = recordSchema.getFields();
+
+ for (final RecordField field : fields) {
+ final DataType dataType = field.getDataType();
+ final RecordFieldType recordFieldType = dataType.getFieldType();
+ if (CONVERSION_REQUIRED_FIELD_TYPES.contains(recordFieldType)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java
new file mode 100644
index 00000000000..7e43a5c4448
--- /dev/null
+++
b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/record/DelegatedRecordTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.iceberg.record;
+
+import org.apache.iceberg.types.Types;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DelegatedRecordTest {
+
+ private static final String LABEL_FIELD = "label";
+ private static final String LABEL = "delegated-record";
+ private static final String LABEL_MODIFIED = "modified-record";
+
+ private static final String CREATED_FIELD = "created";
+ private static final String UPDATED_FIELD = "updated";
+ private static final String STOPPED_FIELD = "stopped";
+ private static final Timestamp CREATED = Timestamp.valueOf("2026-01-01
12:30:45.123456789");
+ private static final LocalDateTime CREATED_CONVERTED =
CREATED.toLocalDateTime();
+ private static final Date UPDATED = Date.valueOf("2026-02-03");
+ private static final LocalDate UPDATED_CONVERTED = UPDATED.toLocalDate();
+ private static final Time STOPPED = Time.valueOf("23:30:45");
+ private static final LocalTime STOPPED_CONVERTED = STOPPED.toLocalTime();
+
+ @Test
+ void testCopyEmptyRecord() {
+ final List<RecordField> recordFields = List.of();
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+ final Record record = new MapRecord(recordSchema, new
LinkedHashMap<>());
+
+ final Types.StructType structType = Types.StructType.of();
+ final DelegatedRecord delegatedRecord = new DelegatedRecord(record,
structType);
+
+ assertEquals(recordSchema.getFieldCount(), delegatedRecord.size());
+
+ final org.apache.iceberg.data.Record copiedRecord =
delegatedRecord.copy();
+ assertEquals(delegatedRecord, copiedRecord);
+
+ assertEquals(delegatedRecord.hashCode(), record.hashCode());
+ }
+
+ @Test
+ void testSetGetStringField() {
+ final RecordSchema recordSchema = new SimpleRecordSchema(
+ List.of(
+ new RecordField(LABEL_FIELD,
RecordFieldType.STRING.getDataType())
+ )
+ );
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put(LABEL_FIELD, LABEL);
+
+ final Record record = new MapRecord(recordSchema, values);
+
+ final Types.StructType structType = Types.StructType.of();
+ final DelegatedRecord delegatedRecord = new DelegatedRecord(record,
structType);
+
+ final Types.StructType recordStruct = delegatedRecord.struct();
+ assertEquals(structType, recordStruct);
+ assertEquals(recordSchema.getFieldCount(), delegatedRecord.size());
+
+ final Object field = delegatedRecord.getField(LABEL_FIELD);
+ assertEquals(LABEL, field);
+
+ final Object firstField = delegatedRecord.get(0);
+ assertEquals(LABEL, firstField);
+
+ final String firstStringField = delegatedRecord.get(0, String.class);
+ assertEquals(LABEL, firstStringField);
+
+ delegatedRecord.setField(LABEL_FIELD, LABEL_MODIFIED);
+ final Object fieldModified = delegatedRecord.getField(LABEL_FIELD);
+ assertEquals(LABEL_MODIFIED, fieldModified);
+
+ delegatedRecord.set(0, LABEL);
+ final Object fieldReverted = delegatedRecord.getField(LABEL_FIELD);
+ assertEquals(LABEL, fieldReverted);
+ }
+
+ @Test
+ void testGetTimestampDateTimeFields() {
+ final RecordSchema recordSchema = new SimpleRecordSchema(
+ List.of(
+ new RecordField(CREATED_FIELD,
RecordFieldType.TIMESTAMP.getDataType()),
+ new RecordField(UPDATED_FIELD,
RecordFieldType.DATE.getDataType()),
+ new RecordField(STOPPED_FIELD,
RecordFieldType.TIME.getDataType())
+ )
+ );
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put(CREATED_FIELD, CREATED);
+ values.put(UPDATED_FIELD, UPDATED);
+ values.put(STOPPED_FIELD, STOPPED);
+
+ final Record record = new MapRecord(recordSchema, values);
+
+ final Types.StructType structType = Types.StructType.of();
+ final DelegatedRecord delegatedRecord = new DelegatedRecord(record,
structType);
+
+ final Object created = delegatedRecord.getField(CREATED_FIELD);
+ assertEquals(CREATED_CONVERTED, created);
+
+ final Object updated = delegatedRecord.getField(UPDATED_FIELD);
+ assertEquals(UPDATED_CONVERTED, updated);
+
+ final Object stopped = delegatedRecord.getField(STOPPED_FIELD);
+ assertEquals(STOPPED_CONVERTED, stopped);
+ }
+}