exceptionfactory commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2206163020
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +77,79 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ // this may happen if the previous processing has not been
completed successfully, close the leftover state
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ // this is unexpected, flowFiles have been altered not in this
class after the start of processing
+ throw new IllegalStateException("%s is not available in
provided FlowFiles [%d]".formatted(currentFlowFileState.flowFile,
flowFiles.size()));
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (final FlowFileCompletionException e) {
+ if
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException(e.getMessage(),
e);
+ }
+ final boolean removeCurrentStateFlowFileIfAvailable = true;
Review Comment:
If this is always `true`, it would be better to set it as a static final
variable instead of a method local variable.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +77,79 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ // this may happen if the previous processing has not been
completed successfully, close the leftover state
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ // this is unexpected, flowFiles have been altered not in this
class after the start of processing
+ throw new IllegalStateException("%s is not available in
provided FlowFiles [%d]".formatted(currentFlowFileState.flowFile,
flowFiles.size()));
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (final FlowFileCompletionException e) {
+ if
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException(e.getMessage(),
e);
Review Comment:
This should include a specific message, such as "FlowFile does not contain
one data from one Kinesis Record"
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ return;
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (final FlowFileCompletionException e) {
+ if
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException(e.getMessage(),
e);
+ }
+ final boolean removeCurrentStateFlowFileIfAvailable = true;
+ final KinesisClientRecord kinesisRecord =
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+ final byte[] data = getData(kinesisRecord);
+ outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable,
flowFiles, session, data, kinesisRecord, e);
+ } finally {
+ currentFlowFileState = null;
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord,
final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
- final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
- if (dataBuffer != null) {
- dataBuffer.get(data);
+ if (currentFlowFileState != null &&
!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
}
- FlowFile flowFile = null;
+ final byte[] data = getData(kinesisRecord);
+
try (final InputStream in = new ByteArrayInputStream(data);
final RecordReader reader =
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length,
getLogger())
) {
Record intermediateRecord;
final PushBackRecordSet recordSet = new
PushBackRecordSet(reader.createRecordSet());
while ((intermediateRecord = recordSet.next()) != null) {
- Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
- if (flowFiles.isEmpty()) {
- flowFile = session.create();
- flowFiles.add(flowFile);
-
- // initialize the writer when the first record is read.
- createWriter(flowFile, session, outputRecord);
+ final Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
+ if (currentFlowFileState == null) {
+ // writer schema is determined by some, usually the first
record
+ currentFlowFileState = initializeState(session,
outputRecord, null);
+ flowFiles.add(currentFlowFileState.flowFile);
}
- final WriteResult writeResult = writer.write(outputRecord);
- recordCount += writeResult.getRecordCount();
-
- // complete the FlowFile if there are no more incoming Kinesis
Records and no more records in this RecordSet
- if (lastRecord && !recordSet.isAnotherRecord()) {
- completeFlowFile(flowFiles, session, recordCount,
writeResult, kinesisRecord, stopWatch);
+ if
(!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema,
outputRecord, false)) {
Review Comment:
Thanks for outlining the issues and describing the potential options.
Each Record Writer is responsible for its own FlowFile writing behavior. For
this reason, attempting to resolve the issue here does not appear to be the
best location. The JSON Record Set Writer may behave one way, but other Record
Writers may be more lenient in certain cases.
In this case, it does not seem like the Processor should introduce schema
compatibility checking, as that is the responsibility of the Record Writer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]