dariuszseweryn commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2185572729
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -197,7 +278,75 @@ private void outputRawRecordOnException(final boolean
firstOutputRecord, final F
private Map<String, String> getDefaultAttributes(final KinesisClientRecord
kinesisRecord) {
final String partitionKey = kinesisRecord.partitionKey();
final String sequenceNumber = kinesisRecord.sequenceNumber();
+ final long subSequenceNumber = kinesisRecord.subSequenceNumber();
final Instant approximateArrivalTimestamp =
kinesisRecord.approximateArrivalTimestamp();
- return getDefaultAttributes(sequenceNumber, partitionKey,
approximateArrivalTimestamp);
+ return getDefaultAttributes(sequenceNumber, subSequenceNumber,
partitionKey, approximateArrivalTimestamp);
+ }
+
+ void closeSafe(final Closeable closeable, final String closeableName) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ getLogger().warn("Failed to close {} due to {}",
closeableName, e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ private record SuccessfulWriteInfo(KinesisClientRecord kinesisRecord,
WriteResult writeResult) { }
+
+ private class FlowFileState implements Closeable {
Review Comment:
It is a little bit more than that — I wanted to give the notion of all the
state associated with the `FlowFile`, not only schema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]