awelless commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2185390712


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -197,7 +278,75 @@ private void outputRawRecordOnException(final boolean 
firstOutputRecord, final F
     private Map<String, String> getDefaultAttributes(final KinesisClientRecord 
kinesisRecord) {
         final String partitionKey = kinesisRecord.partitionKey();
         final String sequenceNumber = kinesisRecord.sequenceNumber();
+        final long subSequenceNumber = kinesisRecord.subSequenceNumber();
         final Instant approximateArrivalTimestamp = 
kinesisRecord.approximateArrivalTimestamp();
-        return getDefaultAttributes(sequenceNumber, partitionKey, 
approximateArrivalTimestamp);
+        return getDefaultAttributes(sequenceNumber, subSequenceNumber, 
partitionKey, approximateArrivalTimestamp);
+    }
+
+    void closeSafe(final Closeable closeable, final String closeableName) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                getLogger().warn("Failed to close {} due to {}", 
closeableName, e.getLocalizedMessage(), e);
+            }
+        }
+    }
+
+    private record SuccessfulWriteInfo(KinesisClientRecord kinesisRecord, 
WriteResult writeResult) { }
+
+    private class FlowFileState implements Closeable {

Review Comment:
   Nit: will `SchematizedFlowFile` or `FlowFileWithSchema` better represent the 
idea behind this class?



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +82,93 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (flowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(flowFileState, "FlowFile State");
+            flowFileState = null;
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (flowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(flowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);
+                closeSafe(flowFileState, "FlowFile State");
+                return;
+            }
+            completeFlowFile(flowFiles, session, stopWatch);
+        } catch (CompleteFlowFileSingleKinesisRecordException e) {
+            final boolean removeFirstFlowFileIfAvailable = true;
+            final KinesisClientRecord kinesisRecord = e.kinesisClientRecord;
+            final byte[] data = getData(kinesisRecord);
+            outputRawRecordOnException(removeFirstFlowFileIfAvailable, 
flowFiles, session, data, kinesisRecord, e);
+        } finally {
+            flowFileState = null;
+            failedKinesisRecordCausingDataLoss = null;
+            failedKinesisRecordCausingDataLossException = null;
+        }
+    }
+
+    @Override
+    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord,
                        final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
-        final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
-        if (dataBuffer != null) {
-            dataBuffer.get(data);
+        if (flowFileState != null && 
!flowFiles.contains(flowFileState.flowFile)) {
+            getLogger().warn("Currently processed FlowFile is no longer 
available, this is not expected.", flowFiles);
+            closeSafe(flowFileState, "FlowFile State");
+            flowFileState = null;
         }
+        if (kinesisRecord == failedKinesisRecordCausingDataLoss)  {
+            // AbstractKinesisRecordProcessor does retry processing of failed 
records. however in case of CompleteFlowFileMultipleKinesisRecordException it 
is impossible to determine the state of
+            // the affected FlowFile and replay all records that were in it. 
To prevent data loss, the exception needs to be rethrown until it is given up 
by the abstract processor. still there may be
+            // other cases where we end up in undetermined state.
+            throw failedKinesisRecordCausingDataLossException;

Review Comment:
   Would it be possible to adjust the retrying logic instead? By telling apart 
retriable and persistent errors we can avoid doing unnecessary retries in 
`AbstractKinesisRecordProcessor`.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -128,62 +177,94 @@ void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kin
         }
     }
 
-    private void createWriter(final FlowFile flowFile, final ProcessSession 
session, final Record outputRecord)
-            throws IOException, SchemaNotFoundException {
-
-        final RecordSchema readerSchema = outputRecord.getSchema();
-        final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
-        outputStream = session.write(flowFile);
-        writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
-        writer.beginRecordSet();
+    private static byte @NotNull [] getData(final KinesisClientRecord 
kinesisRecord) {
+        final ByteBuffer dataBuffer = kinesisRecord.data();
+        final byte[] data = dataBuffer != null ? new 
byte[dataBuffer.remaining()] : new byte[0];
+        if (dataBuffer != null) {
+            dataBuffer.get(data);
+        }
+        return data;
     }
 
-    private void completeFlowFile(final List<FlowFile> flowFiles, final 
ProcessSession session, final int recordCount,
-                                  final WriteResult writeResult, final 
KinesisClientRecord lastRecord, final StopWatch stopWatch)
-            throws IOException {
-
+    /**
+     * Initializes the FlowFile state for the current processing. This 
includes creating a new FlowFile, initializing the RecordSetWriter, and setting 
up the output stream.
+     * In case of an exception during initialization, the FlowFile is removed 
from the session and the resources are closed properly.
+     */
+    private FlowFileState initializeState(final ProcessSession session, final 
Record outputRecord, final FlowFileState previousFlowFileState) throws 
IOException, SchemaNotFoundException {
+        FlowFile flowFile = null;
+        OutputStream outputStream = null;
+        RecordSetWriter writer = null;
         try {
-            writer.finishRecordSet();
-        } catch (IOException e) {
-            getLogger().error("Failed to finish record output due to {}", 
e.getLocalizedMessage(), e);
-            session.remove(flowFiles.get(0));
-            flowFiles.remove(0);
-            throw e;
-        } finally {
-            try {
-                writer.close();
-                outputStream.close();
-            } catch (final IOException e) {
-                getLogger().warn("Failed to close Record Writer due to {}", 
e.getLocalizedMessage(), e);
+            flowFile = session.create();
+            // If we have a previous schema, we need to merge it with the 
current schema to ensure that the writer can handle all fields from both 
schemas going forward.
+            // There is an assumption here that all the records are expected 
by the downstream to have the same-ish schema. It is possible to imagine a 
scenario where records with different schemas
+            // should be written to separate FlowFiles.
+            final RecordSchema newReadSchema = 
Optional.ofNullable(previousFlowFileState).map(it ->
+                    DataTypeUtils.merge(it.recordSchema, 
outputRecord.getSchema())).orElse(outputRecord.getSchema());
+
+            final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, newReadSchema);
+            outputStream = session.write(flowFile);
+            writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
+            writer.beginRecordSet();
+        } catch (final Exception e) {
+            if (flowFile != null) {
+                session.remove(flowFile);
             }
+            closeSafe(writer, "Record Writer");
+            closeSafe(outputStream, "Output Stream");
+            throw e;
         }
+        return new FlowFileState(flowFile, writer, outputStream, 
outputRecord.getSchema());
+    }
 
-        reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
-
-        final Map<String, String> attributes = 
getDefaultAttributes(lastRecord);
-        attributes.put("record.count", String.valueOf(recordCount));
-        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-        attributes.putAll(writeResult.getAttributes());
-        flowFiles.set(0, session.putAllAttributes(flowFiles.get(0), 
attributes));
+    private void completeFlowFile(final List<FlowFile> flowFiles, final 
ProcessSession session, final StopWatch stopWatch)
+            throws CompleteFlowFileSingleKinesisRecordException {
+        if (flowFileState.isFlowFileEmpty()) {
+            flowFiles.remove(flowFileState.flowFile);
+            session.remove(flowFileState.flowFile);
+            closeSafe(flowFileState, "FlowFile State");
+            return;
+        }
+        try {
+            flowFileState.writer.finishRecordSet();
+            closeSafe(flowFileState, "FlowFile State");
+            reportProvenance(session, flowFileState.flowFile, null, null, 
stopWatch);
 
-        writer = null;
-        outputStream = null;
+            final Map<String, String> attributes = 
getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+            attributes.put("record.count", 
String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), 
flowFileState.writer.getMimeType());
+            
attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
+            final int flowFileIndex = 
flowFiles.indexOf(flowFileState.flowFile);
+            flowFiles.set(flowFileIndex, 
session.putAllAttributes(flowFileState.flowFile, attributes));
+        } catch (final IOException e) {
+            flowFiles.remove(flowFileState.flowFile);
+            session.remove(flowFileState.flowFile);
+            closeSafe(flowFileState, "FlowFile State");
+            final String message = "Failed to complete a FlowFile containing 
records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range: 
[%s/%d, %s/%d)".formatted(
+                    getStreamName(),
+                    getKinesisShardId(),
+                    
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                    
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(),
+                    
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                    
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber()
+            );
+            final boolean isSingleKinesisRecordInFlowFile = 
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.equals(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+            if (isSingleKinesisRecordInFlowFile) {
+                // in case of a single Kinesis Record in FlowFile, so we can 
route it to failure relationship
+                throw new 
CompleteFlowFileSingleKinesisRecordException(message, e, 
flowFileState.firstSuccessfulWriteInfo.kinesisRecord);
+            }
+            // in case of multiple Kinesis Records the whole batch needs to be 
failed, otherwise data can be lost
+            throw new CompleteFlowFileMultipleKinesisRecordException(message, 
e);

Review Comment:
   For me different exception types seem counterintuitive.
   Should we better do this check in the error handling piece of code?
   



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -47,16 +51,19 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class KinesisRecordProcessorRecord extends 
AbstractKinesisRecordProcessor {
     final RecordReaderFactory readerFactory;
     final RecordSetWriterFactory writerFactory;
     final Map<String, String> schemaRetrievalVariables;
 
-    private RecordSetWriter writer;
-    private OutputStream outputStream;
+    private FlowFileState flowFileState;

Review Comment:
   Nit: I reckon `(current|open)FlowFile` name will better convey the meaning 
of this field.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -128,62 +177,94 @@ void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kin
         }
     }
 
-    private void createWriter(final FlowFile flowFile, final ProcessSession 
session, final Record outputRecord)
-            throws IOException, SchemaNotFoundException {
-
-        final RecordSchema readerSchema = outputRecord.getSchema();
-        final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
-        outputStream = session.write(flowFile);
-        writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
-        writer.beginRecordSet();
+    private static byte @NotNull [] getData(final KinesisClientRecord 
kinesisRecord) {
+        final ByteBuffer dataBuffer = kinesisRecord.data();
+        final byte[] data = dataBuffer != null ? new 
byte[dataBuffer.remaining()] : new byte[0];
+        if (dataBuffer != null) {
+            dataBuffer.get(data);
+        }
+        return data;
     }
 
-    private void completeFlowFile(final List<FlowFile> flowFiles, final 
ProcessSession session, final int recordCount,
-                                  final WriteResult writeResult, final 
KinesisClientRecord lastRecord, final StopWatch stopWatch)
-            throws IOException {
-
+    /**
+     * Initializes the FlowFile state for the current processing. This 
includes creating a new FlowFile, initializing the RecordSetWriter, and setting 
up the output stream.
+     * In case of an exception during initialization, the FlowFile is removed 
from the session and the resources are closed properly.
+     */
+    private FlowFileState initializeState(final ProcessSession session, final 
Record outputRecord, final FlowFileState previousFlowFileState) throws 
IOException, SchemaNotFoundException {
+        FlowFile flowFile = null;
+        OutputStream outputStream = null;
+        RecordSetWriter writer = null;
         try {
-            writer.finishRecordSet();
-        } catch (IOException e) {
-            getLogger().error("Failed to finish record output due to {}", 
e.getLocalizedMessage(), e);
-            session.remove(flowFiles.get(0));
-            flowFiles.remove(0);
-            throw e;
-        } finally {
-            try {
-                writer.close();
-                outputStream.close();
-            } catch (final IOException e) {
-                getLogger().warn("Failed to close Record Writer due to {}", 
e.getLocalizedMessage(), e);
+            flowFile = session.create();
+            // If we have a previous schema, we need to merge it with the 
current schema to ensure that the writer can handle all fields from both 
schemas going forward.
+            // There is an assumption here that all the records are expected 
by the downstream to have the same-ish schema. It is possible to imagine a 
scenario where records with different schemas
+            // should be written to separate FlowFiles.
+            final RecordSchema newReadSchema = 
Optional.ofNullable(previousFlowFileState).map(it ->
+                    DataTypeUtils.merge(it.recordSchema, 
outputRecord.getSchema())).orElse(outputRecord.getSchema());
+
+            final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, newReadSchema);
+            outputStream = session.write(flowFile);
+            writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
+            writer.beginRecordSet();
+        } catch (final Exception e) {
+            if (flowFile != null) {
+                session.remove(flowFile);
             }
+            closeSafe(writer, "Record Writer");
+            closeSafe(outputStream, "Output Stream");
+            throw e;
         }
+        return new FlowFileState(flowFile, writer, outputStream, 
outputRecord.getSchema());
+    }
 
-        reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
-
-        final Map<String, String> attributes = 
getDefaultAttributes(lastRecord);
-        attributes.put("record.count", String.valueOf(recordCount));
-        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-        attributes.putAll(writeResult.getAttributes());
-        flowFiles.set(0, session.putAllAttributes(flowFiles.get(0), 
attributes));
+    private void completeFlowFile(final List<FlowFile> flowFiles, final 
ProcessSession session, final StopWatch stopWatch)
+            throws CompleteFlowFileSingleKinesisRecordException {
+        if (flowFileState.isFlowFileEmpty()) {
+            flowFiles.remove(flowFileState.flowFile);
+            session.remove(flowFileState.flowFile);
+            closeSafe(flowFileState, "FlowFile State");
+            return;
+        }
+        try {
+            flowFileState.writer.finishRecordSet();
+            closeSafe(flowFileState, "FlowFile State");
+            reportProvenance(session, flowFileState.flowFile, null, null, 
stopWatch);
 
-        writer = null;
-        outputStream = null;
+            final Map<String, String> attributes = 
getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+            attributes.put("record.count", 
String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), 
flowFileState.writer.getMimeType());
+            
attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
+            final int flowFileIndex = 
flowFiles.indexOf(flowFileState.flowFile);
+            flowFiles.set(flowFileIndex, 
session.putAllAttributes(flowFileState.flowFile, attributes));
+        } catch (final IOException e) {
+            flowFiles.remove(flowFileState.flowFile);
+            session.remove(flowFileState.flowFile);
+            closeSafe(flowFileState, "FlowFile State");
+            final String message = "Failed to complete a FlowFile containing 
records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range: 
[%s/%d, %s/%d)".formatted(
+                    getStreamName(),
+                    getKinesisShardId(),
+                    
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                    
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(),
+                    
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                    
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber()
+            );
+            final boolean isSingleKinesisRecordInFlowFile = 
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.equals(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);

Review Comment:
   Nit: let's incapsulate the logic of determining how many records there are 
in the current flow file into `FlowFileState` class. 



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +82,93 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (flowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(flowFileState, "FlowFile State");
+            flowFileState = null;
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (flowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(flowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);
+                closeSafe(flowFileState, "FlowFile State");
+                return;
+            }
+            completeFlowFile(flowFiles, session, stopWatch);
+        } catch (CompleteFlowFileSingleKinesisRecordException e) {
+            final boolean removeFirstFlowFileIfAvailable = true;

Review Comment:
   Nit: that's "current" file now, not "first"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to