exceptionfactory commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2190933519


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java:
##########
@@ -177,12 +182,15 @@ private int processRecordsWithRetries(final 
List<KinesisClientRecord> records, f
         return recordsTransformed;
     }
 
-    private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord,
                                          final ProcessSession session, final 
StopWatch stopWatch) {
         boolean processedSuccessfully = false;
         try {
-            processRecord(flowFiles, kinesisRecord, lastRecord, session, 
stopWatch);
+            processRecord(flowFiles, kinesisRecord, session, stopWatch);
             processedSuccessfully = true;
+        } catch (final KinesisBatchUnrecoverableException e) {
+            // don't attempt retry, rethrow

Review Comment:
   Recommend removing this comment since it simply describes the behavior, and 
the exception name itself indicates the reason.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -82,15 +85,18 @@ void startProcessingRecords() {
     @Override
     void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
                        final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
+        if (flowFiles.size() > 1) {
+            // historically this code has assumed that the FlowFile it 
operates on is the first one in the list.
+            // changing this behavior would exceed the scope of the bugfix 
change this comment was added in.
+            // leaving this comment to inform future maintainers that 
assumption is rather weak and should be refactored
+            getLogger().warn("More than one FlowFile is being processed at 
once, this is not expected.", flowFiles);
+        }

Review Comment:
   Instead of introducing this log and message, please create a Jira issue for 
tracking.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (currentFlowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
+        }
+    }
+
+    @Override
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (currentFlowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);
+                closeSafe(currentFlowFileState, "FlowFile State");
+                return;
+            }
+            completeFlowFile(flowFiles, session, stopWatch);
+        } catch (final FlowFileCompletionException e) {
+            if 
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+                throw new KinesisBatchUnrecoverableException(e.getMessage(), 
e);
+            }
+            final boolean removeCurrentStateFlowFileIfAvailable = true;
+            final KinesisClientRecord kinesisRecord = 
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+            final byte[] data = getData(kinesisRecord);
+            outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable, 
flowFiles, session, data, kinesisRecord, e);
+        } finally {
+            currentFlowFileState = null;
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord,
                        final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
-        final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
-        if (dataBuffer != null) {
-            dataBuffer.get(data);
+        if (currentFlowFileState != null && 
!flowFiles.contains(currentFlowFileState.flowFile)) {
+            getLogger().warn("Currently processed FlowFile is no longer 
available, this is not expected.", flowFiles);
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
         }
 
-        FlowFile flowFile = null;
+        final byte[] data = getData(kinesisRecord);
+
         try (final InputStream in = new ByteArrayInputStream(data);
              final RecordReader reader = 
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, 
getLogger())
         ) {
             Record intermediateRecord;
             final PushBackRecordSet recordSet = new 
PushBackRecordSet(reader.createRecordSet());
             while ((intermediateRecord = recordSet.next()) != null) {
-                Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
-                if (flowFiles.isEmpty()) {
-                    flowFile = session.create();
-                    flowFiles.add(flowFile);
-
-                    // initialize the writer when the first record is read.
-                    createWriter(flowFile, session, outputRecord);
+                final Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
+                if (currentFlowFileState == null) {
+                    // writer schema is determined by some, usually the first 
record
+                    currentFlowFileState = initializeState(session, 
outputRecord, null);
+                    flowFiles.add(currentFlowFileState.flowFile);
                 }
 
-                final WriteResult writeResult = writer.write(outputRecord);
-                recordCount += writeResult.getRecordCount();
-
-                // complete the FlowFile if there are no more incoming Kinesis 
Records and no more records in this RecordSet
-                if (lastRecord && !recordSet.isAnotherRecord()) {
-                    completeFlowFile(flowFiles, session, recordCount, 
writeResult, kinesisRecord, stopWatch);
+                if 
(!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema, 
outputRecord, false)) {
+                    // subsequent records may have different schema. if so, 
try complete current FlowFile and start a new one with wider schema to continue

Review Comment:
   The comment raises a question about the implementation approach. Schema 
changes should either work with an inferred schema, or should throw an 
exception because of a schema mismatch.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java:
##########
@@ -284,22 +290,63 @@ public void 
testProcessPoisonPillRecordButNoRawOutputWithCheckpoint() throws Shu
         session.assertNotRolledBack();
     }
 
-    @Test
-    public void testProcessUnparsableRecordWithRawOutputWithCheckpoint() 
throws ShutdownException, InvalidStateException {
+    private static Stream<Arguments> unparsableRecordsLists() {
+        final KinesisClientRecord unparsableRecordMock = 
mock(KinesisClientRecord.class);
+        return Stream.of(
+                Arguments.argumentSet("Unparsable At The Beginning",
+                        Arrays.asList(
+                                unparsableRecordMock,
+                                
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
+                                        .partitionKey("partition-1")
+                                        .sequenceNumber("1")
+                                        
.data(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8)))

Review Comment:
   It looks the data content for each of these examples could be declared 
statically and reused.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (currentFlowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
+        }
+    }
+
+    @Override
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (currentFlowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);
+                closeSafe(currentFlowFileState, "FlowFile State");
+                return;
+            }
+            completeFlowFile(flowFiles, session, stopWatch);
+        } catch (final FlowFileCompletionException e) {
+            if 
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+                throw new KinesisBatchUnrecoverableException(e.getMessage(), 
e);
+            }
+            final boolean removeCurrentStateFlowFileIfAvailable = true;
+            final KinesisClientRecord kinesisRecord = 
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+            final byte[] data = getData(kinesisRecord);
+            outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable, 
flowFiles, session, data, kinesisRecord, e);
+        } finally {
+            currentFlowFileState = null;
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord,
                        final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
-        final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
-        if (dataBuffer != null) {
-            dataBuffer.get(data);
+        if (currentFlowFileState != null && 
!flowFiles.contains(currentFlowFileState.flowFile)) {
+            getLogger().warn("Currently processed FlowFile is no longer 
available, this is not expected.", flowFiles);
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
         }
 
-        FlowFile flowFile = null;
+        final byte[] data = getData(kinesisRecord);
+
         try (final InputStream in = new ByteArrayInputStream(data);
              final RecordReader reader = 
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, 
getLogger())
         ) {
             Record intermediateRecord;
             final PushBackRecordSet recordSet = new 
PushBackRecordSet(reader.createRecordSet());
             while ((intermediateRecord = recordSet.next()) != null) {
-                Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
-                if (flowFiles.isEmpty()) {
-                    flowFile = session.create();
-                    flowFiles.add(flowFile);
-
-                    // initialize the writer when the first record is read.
-                    createWriter(flowFile, session, outputRecord);
+                final Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
+                if (currentFlowFileState == null) {
+                    // writer schema is determined by some, usually the first 
record
+                    currentFlowFileState = initializeState(session, 
outputRecord, null);
+                    flowFiles.add(currentFlowFileState.flowFile);
                 }
 
-                final WriteResult writeResult = writer.write(outputRecord);
-                recordCount += writeResult.getRecordCount();
-
-                // complete the FlowFile if there are no more incoming Kinesis 
Records and no more records in this RecordSet
-                if (lastRecord && !recordSet.isAnotherRecord()) {
-                    completeFlowFile(flowFiles, session, recordCount, 
writeResult, kinesisRecord, stopWatch);
+                if 
(!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema, 
outputRecord, false)) {

Review Comment:
   Although the explicit check is an option, is there a reason for this 
approach as opposed to catching exceptions when the type is not compatible?



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -128,62 +166,88 @@ void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kin
         }
     }
 
-    private void createWriter(final FlowFile flowFile, final ProcessSession 
session, final Record outputRecord)
-            throws IOException, SchemaNotFoundException {
-
-        final RecordSchema readerSchema = outputRecord.getSchema();
-        final RecordSchema writeSchema = 
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
-        outputStream = session.write(flowFile);
-        writer = writerFactory.createWriter(getLogger(), writeSchema, 
outputStream, flowFile);
-        writer.beginRecordSet();
+    private static byte @NotNull [] getData(final KinesisClientRecord 
kinesisRecord) {

Review Comment:
   ```suggestion
       private static byte [] getData(final KinesisClientRecord kinesisRecord) {
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (currentFlowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
+        }
+    }
+
+    @Override
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (currentFlowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);

Review Comment:
   The `flowFiles` argument is not included as a parameter in this warning log 
message. Recommend logging the specified FlowFile and just the number of others.
   
   However, can this occur? Instead of logging a warning, should an 
`IllegalStateException` be thrown instead?
   
   ```suggestion
                   getLogger().warn("{} is not available in provided FlowFiles 
[{}]", currentFlowFileState.flowFile, flowFiles.size());
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -34,10 +34,14 @@
 import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.apache.nifi.util.StopWatch;
+import org.jetbrains.annotations.NotNull;

Review Comment:
   The `NotNull` annotation from JetBrains should generally be avoided given 
the lack of the explicit dependency.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (currentFlowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
+        }
+    }
+
+    @Override
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (currentFlowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);
+                closeSafe(currentFlowFileState, "FlowFile State");
+                return;
+            }
+            completeFlowFile(flowFiles, session, stopWatch);
+        } catch (final FlowFileCompletionException e) {
+            if 
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+                throw new KinesisBatchUnrecoverableException(e.getMessage(), 
e);
+            }
+            final boolean removeCurrentStateFlowFileIfAvailable = true;
+            final KinesisClientRecord kinesisRecord = 
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+            final byte[] data = getData(kinesisRecord);
+            outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable, 
flowFiles, session, data, kinesisRecord, e);
+        } finally {
+            currentFlowFileState = null;
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord,
                        final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
-        final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
-        if (dataBuffer != null) {
-            dataBuffer.get(data);
+        if (currentFlowFileState != null && 
!flowFiles.contains(currentFlowFileState.flowFile)) {
+            getLogger().warn("Currently processed FlowFile is no longer 
available, this is not expected.", flowFiles);

Review Comment:
   See note above on similar behavior.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -197,7 +261,64 @@ private void outputRawRecordOnException(final boolean 
firstOutputRecord, final F
     private Map<String, String> getDefaultAttributes(final KinesisClientRecord 
kinesisRecord) {
         final String partitionKey = kinesisRecord.partitionKey();
         final String sequenceNumber = kinesisRecord.sequenceNumber();
+        final long subSequenceNumber = kinesisRecord.subSequenceNumber();
         final Instant approximateArrivalTimestamp = 
kinesisRecord.approximateArrivalTimestamp();
-        return getDefaultAttributes(sequenceNumber, partitionKey, 
approximateArrivalTimestamp);
+        return getDefaultAttributes(sequenceNumber, subSequenceNumber, 
partitionKey, approximateArrivalTimestamp);
+    }
+
+    void closeSafe(final Closeable closeable, final String closeableName) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                getLogger().warn("Failed to close {} due to {}", 
closeableName, e.getLocalizedMessage(), e);

Review Comment:
   It is not necessary to repeat the exception message since it is included in 
the stack trace of the cause.
   ```suggestion
                   getLogger().warn("Failed to close {}", closeableName, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to