dariuszseweryn commented on PR #10053:
URL: https://github.com/apache/nifi/pull/10053#issuecomment-3078595349
Taking a step back then.
# Problems with `KinesisRecordProcessorRecord`
There are several functional problems with `KinesisRecordProcessorRecord`:
1. The class does not complete a FlowFile if there is any exception thrown
when the last record in a batch is processed. This makes NiFi log a warning but
it was observed that such FlowFile was retrieved by another processor
downstream without any attributes.
1. The class does not handle well cases where schema of subsequent records
differs especially when it is handled reactively because the exceptions thrown
by the writers may leave the FlowFile in an inconsistent state — requiring a
full rollback of transaction effectively halting the flow until human
intervention. This is a problem in two situations:
- When schema is inferred from the record itself — I can understand that
it is considered "best effort" support, yet it is a problem from correctness
standpoint
- When schema is referenced in the record header via usage of
`ConfluentEncodedSchemaReferenceReader` or `GlueEncodedSchemaReferenceReader` —
which, I think, should be a valid, fully supported use-case. There are
of-course several approaches to that which I will continue in a separate
paragraph.
1. The class incorrectly calculates `record.count` attribute when multiple
`intermediateRecord` are available in `kinesisClientRecord.data()` content for
the last processed kinesis record in batch. (If there are 2 intermediate
records in the last kinesis record, `record.count` is reported as a 1 bigger
than it should, if 3 intermediate records = +3, if 3 = +6, 4 = +10 and so on).
There is a non-functional problem with `KinesisRecordProcessorRecord` — it
contains and assumes a lot of state (FlowFile, RecordWriter, OutputStream,
Session, contents of passed List<FlowFile>). Consolidation of state management
is recommended for lowering cognitive burden for future readers. In this PR I
have consolidated the state into `FlowFileState` class which is instantiated
only in `#processRecord` function and encapsulates other state-altering
interactions.
# Approaches to records that have incompatible schema
Currently the processor assumes the first kinesis record determines the
schema for the whole FlowFile. If subsequent record schema does not match,
depending on the mismatch case, it will either be not fully written (additional
fields in subsequent records will get omitted) or an exception will get thrown
(if field's content cannot be coerced, too large numeric value or a different
data type) potentially leaving the FlowFile in inconsistent state (verified for
`JsonRecordSetWriter`).
Given no guarantee that subsequent records will have matching case (due to
wrong inference or schema encoding in header) the current behavior is not well
suited. What can be done?
1. Add to contract of all RecordWriter implementations that in case of
exception thrown the FlowFile is left in consistent state. This would need
changes in potentially all implemented writers and as a way was rejected in
[NIFI-14753](https://issues.apache.org/jira/browse/NIFI-14753). This would
allow reactive approach to schema mismatches and routing offending records to
Parse Failure (or rather introduce a new relationship) and continuing
processing — otherwise the processing is halted without human intervention
because the FlowFile needs to be dropped as it is in inconsistent state (I am
unsure if readers downstream would be able to cope with a FlowFile in such
state).
1. Add schema validation in `KinesisRecordProcessorRecord` for each
subsequent record which would allow completing an in-flight FlowFile and
opening a new one. It was
[rejected](https://github.com/apache/nifi/pull/10053#discussion_r2206166722)
though.
- If there is a fear of performance impact we could add a parameter to
the `ConsumeKinesisStream` processor whether it should expect multiple schemas
in a single batch of Kinesis records and do the compatibility check. It doesn't
look like a good way forward from the UX perspective as the reading logic would
need to spill outside of the `RecordReader` definition. The processor does not
have means to introspect the reader settings though, so it cannot
auto-configure.
1. Make the processor do two passes over all `KinesisClientRecord` in batch.
First to determine the encompassing schema, second to write data. At the first
glance this approach seems surprising to me if I was a user of a stream that
contains data in many schemas.
Personally I think that from the correctness point of view the processor
should be able to close a currently processed FlowFile when the subsequent
record's schema does not match and create a new FlowFile. This could have
performance implications for users that have records with alternating schemas
on each one — it should be technically correct implementation though. For users
that have a single, static schema setup the only potential downside would be
the performance impact of schema validation for each record — it may be
negligible though anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]