Sönke Liebau created NIFI-16067:
-----------------------------------
Summary: PutIcebergRecord maps fields by ordinal position instead
of column name
Key: NIFI-16067
URL: https://issues.apache.org/jira/browse/NIFI-16067
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Affects Versions: 2.10.0, 2.9.0, 2.7.2, 2.7.1, 2.8.0, 2.7.0
Reporter: Sönke Liebau
h3. Description
PutIcebergRecord (added in NIFI-15062) doesn't match columns by name. It writes
each
record's values into the Iceberg table columns by position, and the position it
uses is
the one from the *incoming* record schema, not the table schema. So the mapping
is wrong
whenever the input field order doesn't line up with the table's column order.
If the record has fewer fields than the table, the missing columns aren't set
to NULL
either - the values just shift left into whatever table column happens to sit
at that
position. Sometimes that lands on an incompatible type and you get a
ClassCastException,
which is the good outcome because at least it fails loudly. When the types
happen to line
up, the row is written with values in the wrong columns and nothing complains.
This gets worse with schema evolution: as soon as you add a column, the
producer and the
table no longer agree on the column set, and existing flows start silently
writing data
into the wrong place.
h3. Steps to reproduce
Create an Iceberg table whose column order differs from the incoming record (or
add a
new column so the record has fewer fields than the table), then push records
through
PutIcebergRecord into it. Depending on the type alignment you'll either see a
ClassCastException or rows written into the wrong columns.
h3. Root cause
{{DelegatedRecord.get(int position)}} looks the position up against the input
record
schema instead of the Iceberg struct the wrapper already holds:
nifi-iceberg-processors/.../record/DelegatedRecord.java, lines 78-82:
{code:java}
@Override
public Object get(final int position) {
final RecordField recordField = record.getSchema().getField(position); //
input schema
return record.getValue(recordField);
}
{code}
The Iceberg write path walks the table struct position by position and calls
{{get(i)}}
for column i, but {{get(i)}} hands back input field i, so column i ends up with
input
column i's value no matter what it's named. The struct is only exposed through
{{struct()}} and never used for the actual lookup. {{get(int, Class)}} just
delegates to
{{{}get(int){}}}, so it's wrong too.
h3. Fix
Look the field up by name instead of id, something along the lines below:
{code:java}
@Override
public Object get(final int position) {
final Types.NestedField field = struct.fields().get(position);
return record.getValue(field.name());
}
{code}
We are happy to open a PR for this if the issue is confirmed.
h3. Related
NIFI-14162 and NIFI-14614 are the same ordinal-vs-name problem in QueryRecord
(both
fixed for 2.7.0). This is the Iceberg-bundle version of it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)