dariuszseweryn commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2191036993


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final 
ProcessSessionFactory sessionFactory,
     @Override
     void startProcessingRecords() {
         super.startProcessingRecords();
-        outputStream = null;
-        writer = null;
+        if (currentFlowFileState != null) {
+            getLogger().warn("FlowFile State is not null at the start of 
processing records, this is not expected.");
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
+        }
+    }
+
+    @Override
+    void finishProcessingRecords(final ProcessSession session, final 
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+        super.finishProcessingRecords(session, flowFiles, stopWatch);
+        try {
+            if (currentFlowFileState == null) {
+                return;
+            }
+            if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+                getLogger().warn("Currently processed FlowFile is no longer 
available at processing end, this is not expected.", flowFiles);
+                closeSafe(currentFlowFileState, "FlowFile State");
+                return;
+            }
+            completeFlowFile(flowFiles, session, stopWatch);
+        } catch (final FlowFileCompletionException e) {
+            if 
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+                throw new KinesisBatchUnrecoverableException(e.getMessage(), 
e);
+            }
+            final boolean removeCurrentStateFlowFileIfAvailable = true;
+            final KinesisClientRecord kinesisRecord = 
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+            final byte[] data = getData(kinesisRecord);
+            outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable, 
flowFiles, session, data, kinesisRecord, e);
+        } finally {
+            currentFlowFileState = null;
+        }
     }
 
     @Override
-    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+    void processRecord(final List<FlowFile> flowFiles, final 
KinesisClientRecord kinesisRecord,
                        final ProcessSession session, final StopWatch 
stopWatch) {
-        boolean firstOutputRecord = true;
-        int recordCount = 0;
-        final ByteBuffer dataBuffer = kinesisRecord.data();
-        byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] : 
new byte[0];
-        if (dataBuffer != null) {
-            dataBuffer.get(data);
+        if (currentFlowFileState != null && 
!flowFiles.contains(currentFlowFileState.flowFile)) {
+            getLogger().warn("Currently processed FlowFile is no longer 
available, this is not expected.", flowFiles);
+            closeSafe(currentFlowFileState, "FlowFile State");
+            currentFlowFileState = null;
         }
 
-        FlowFile flowFile = null;
+        final byte[] data = getData(kinesisRecord);
+
         try (final InputStream in = new ByteArrayInputStream(data);
              final RecordReader reader = 
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length, 
getLogger())
         ) {
             Record intermediateRecord;
             final PushBackRecordSet recordSet = new 
PushBackRecordSet(reader.createRecordSet());
             while ((intermediateRecord = recordSet.next()) != null) {
-                Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
-                if (flowFiles.isEmpty()) {
-                    flowFile = session.create();
-                    flowFiles.add(flowFile);
-
-                    // initialize the writer when the first record is read.
-                    createWriter(flowFile, session, outputRecord);
+                final Record outputRecord = 
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(), 
getKinesisShardId());
+                if (currentFlowFileState == null) {
+                    // writer schema is determined by some, usually the first 
record
+                    currentFlowFileState = initializeState(session, 
outputRecord, null);
+                    flowFiles.add(currentFlowFileState.flowFile);
                 }
 
-                final WriteResult writeResult = writer.write(outputRecord);
-                recordCount += writeResult.getRecordCount();
-
-                // complete the FlowFile if there are no more incoming Kinesis 
Records and no more records in this RecordSet
-                if (lastRecord && !recordSet.isAnotherRecord()) {
-                    completeFlowFile(flowFiles, session, recordCount, 
writeResult, kinesisRecord, stopWatch);
+                if 
(!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema, 
outputRecord, false)) {
+                    // subsequent records may have different schema. if so, 
try complete current FlowFile and start a new one with wider schema to continue

Review Comment:
   Should all the record readers default to biggest number representations when 
inferring schema?
   
   Or you suggest that RecordSchema/RecordReader would expose information if it 
was inferred and then the processor should do two passes over the record batch? 
First for creating a schema for all records and second for actual serialization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to