awelless commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2175268484


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -99,27 +100,44 @@ void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kin
             while ((intermediateRecord = recordSet.next()) != null) {
                 Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
                 if (flowFiles.isEmpty()) {
-                    flowFile = session.create();
-                    flowFiles.add(flowFile);
+                    final FlowFile createdFlowFile = session.create();
+                    flowFiles.add(createdFlowFile);
 
                     // initialize the writer when the first record is read.
-                    createWriter(flowFile, session, outputRecord);
+                    createWriter(createdFlowFile, session, outputRecord);
+                    emptyFlowFileToRemove = createdFlowFile;
                 }
 
                 final WriteResult writeResult = writer.write(outputRecord);
-                recordCount += writeResult.getRecordCount();
-
-                // complete the FlowFile if there are no more incoming Kinesis 
Records and no more records in this RecordSet
-                if (lastRecord && !recordSet.isAnotherRecord()) {
-                    completeFlowFile(flowFiles, session, recordCount, 
writeResult, kinesisRecord, stopWatch);
-                }
-                firstOutputRecord = false;
+                // definitely no longer empty
+                emptyFlowFileToRemove = null;

Review Comment:
   Nit: perhaps, it would be simpler to keep `flowFile` but introduce `boolean 
flowFileEmpty` to check if a file should be removed. Or even we can use 
`firstSuccessfulWriteInfo` to check if anything has been written.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -99,27 +100,44 @@ void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kin
             while ((intermediateRecord = recordSet.next()) != null) {
                 Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
                 if (flowFiles.isEmpty()) {
-                    flowFile = session.create();
-                    flowFiles.add(flowFile);
+                    final FlowFile createdFlowFile = session.create();
+                    flowFiles.add(createdFlowFile);
 
                     // initialize the writer when the first record is read.
-                    createWriter(flowFile, session, outputRecord);
+                    createWriter(createdFlowFile, session, outputRecord);
+                    emptyFlowFileToRemove = createdFlowFile;
                 }
 
                 final WriteResult writeResult = writer.write(outputRecord);
-                recordCount += writeResult.getRecordCount();
-
-                // complete the FlowFile if there are no more incoming Kinesis 
Records and no more records in this RecordSet
-                if (lastRecord && !recordSet.isAnotherRecord()) {
-                    completeFlowFile(flowFiles, session, recordCount, 
writeResult, kinesisRecord, stopWatch);
-                }
-                firstOutputRecord = false;
+                // definitely no longer empty
+                emptyFlowFileToRemove = null;
+                firstSuccessfulWriteInfo = firstSuccessfulWriteInfo == null
+                        ? new SuccessfulWriteInfo(kinesisRecord, writeResult)
+                        : firstSuccessfulWriteInfo;
+                lastSuccessfulWriteInfo = new 
SuccessfulWriteInfo(kinesisRecord, writeResult);
             }
-        } catch (final MalformedRecordException | IOException | 
SchemaNotFoundException e) {
+        } catch (final MalformedRecordException | IOException | 
SchemaNotFoundException | IllegalTypeConversionException e) {
             // write raw Kinesis Record to the parse failure relationship
             getLogger().error("Failed to parse message from Kinesis Stream 
using configured Record Reader and Writer due to {}",
                     e.getLocalizedMessage(), e);
-            outputRawRecordOnException(firstOutputRecord, flowFile, flowFiles, 
session, data, kinesisRecord, e);
+            outputRawRecordOnException(emptyFlowFileToRemove, flowFiles, 
session, data, kinesisRecord, e);
+        }
+
+        // complete the FlowFile if there are no more incoming Kinesis Records 
and no more records in this RecordSet
+        if (lastRecord && !flowFiles.isEmpty()) {
+            try {
+                completeFlowFile(flowFiles, session, 
lastSuccessfulWriteInfo.writeResult.getRecordCount(), 
lastSuccessfulWriteInfo.writeResult, lastSuccessfulWriteInfo.kinesisRecord, 
stopWatch);
+            } catch (IOException e) {
+                getLogger().error("Failed to complete a FlowFile, dropped 
records from stream: {}, shardId: {}, sequence number range: [{}, {}], due to 
{}",
+                        getStreamName(),
+                        getKinesisShardId(),
+                        
firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                        lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+                        e.getLocalizedMessage(),
+                        e);
+            }
+            firstSuccessfulWriteInfo = null;

Review Comment:
   Previously we did `outputRawRecordOnException` when `completeFlowFile` threw 
any error. After refactoring this out, should we preserve the error handling 
logic there?



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java:
##########
@@ -343,6 +347,23 @@ public void 
testProcessUnparsableRecordWithRawOutputWithCheckpoint() throws Shut
         session.assertNotRolledBack();
     }
 
+    @Test
+    public void 
testProcessUnparsableRecordWithRawOutputWithCheckpoint_lastRecordInvalid() 
throws ShutdownException, InvalidStateException {

Review Comment:
   Nit: `@ParameterizedTest` with `@MethodSource` instead of 
`testProcessUnparsableRecordWithRawOutputWithCheckpoint` and 
`testProcessUnparsableRecordWithRawOutputWithCheckpoint_lastRecordInvalid` 
should make the tests clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to