exceptionfactory commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2190933519
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.java:
##########
@@ -177,12 +182,15 @@ private int processRecordsWithRetries(final
List<KinesisClientRecord> records, f
return recordsTransformed;
}
- private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ private boolean attemptProcessRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord,
final ProcessSession session, final
StopWatch stopWatch) {
boolean processedSuccessfully = false;
try {
- processRecord(flowFiles, kinesisRecord, lastRecord, session,
stopWatch);
+ processRecord(flowFiles, kinesisRecord, session, stopWatch);
processedSuccessfully = true;
+ } catch (final KinesisBatchUnrecoverableException e) {
+ // don't attempt retry, rethrow
Review Comment:
Recommend removing this comment since it simply describes the behavior, and
the exception name itself indicates the reason.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -82,15 +85,18 @@ void startProcessingRecords() {
@Override
void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
+ if (flowFiles.size() > 1) {
+ // historically this code has assumed that the FlowFile it
operates on is the first one in the list.
+ // changing this behavior would exceed the scope of the bugfix
change this comment was added in.
+ // leaving this comment to inform future maintainers that
assumption is rather weak and should be refactored
+ getLogger().warn("More than one FlowFile is being processed at
once, this is not expected.", flowFiles);
+ }
Review Comment:
Instead of introducing this log and message, please create a Jira issue for
tracking.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ return;
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (final FlowFileCompletionException e) {
+ if
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException(e.getMessage(),
e);
+ }
+ final boolean removeCurrentStateFlowFileIfAvailable = true;
+ final KinesisClientRecord kinesisRecord =
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+ final byte[] data = getData(kinesisRecord);
+ outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable,
flowFiles, session, data, kinesisRecord, e);
+ } finally {
+ currentFlowFileState = null;
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord,
final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
- final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
- if (dataBuffer != null) {
- dataBuffer.get(data);
+ if (currentFlowFileState != null &&
!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
}
- FlowFile flowFile = null;
+ final byte[] data = getData(kinesisRecord);
+
try (final InputStream in = new ByteArrayInputStream(data);
final RecordReader reader =
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length,
getLogger())
) {
Record intermediateRecord;
final PushBackRecordSet recordSet = new
PushBackRecordSet(reader.createRecordSet());
while ((intermediateRecord = recordSet.next()) != null) {
- Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
- if (flowFiles.isEmpty()) {
- flowFile = session.create();
- flowFiles.add(flowFile);
-
- // initialize the writer when the first record is read.
- createWriter(flowFile, session, outputRecord);
+ final Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
+ if (currentFlowFileState == null) {
+ // writer schema is determined by some, usually the first
record
+ currentFlowFileState = initializeState(session,
outputRecord, null);
+ flowFiles.add(currentFlowFileState.flowFile);
}
- final WriteResult writeResult = writer.write(outputRecord);
- recordCount += writeResult.getRecordCount();
-
- // complete the FlowFile if there are no more incoming Kinesis
Records and no more records in this RecordSet
- if (lastRecord && !recordSet.isAnotherRecord()) {
- completeFlowFile(flowFiles, session, recordCount,
writeResult, kinesisRecord, stopWatch);
+ if
(!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema,
outputRecord, false)) {
+ // subsequent records may have different schema. if so,
try complete current FlowFile and start a new one with wider schema to continue
Review Comment:
The comment raises a question about the implementation approach. Schema
changes should either work with an inferred schema, or should throw an
exception because of a schema mismatch.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/record/TestKinesisRecordProcessorRecord.java:
##########
@@ -284,22 +290,63 @@ public void
testProcessPoisonPillRecordButNoRawOutputWithCheckpoint() throws Shu
session.assertNotRolledBack();
}
- @Test
- public void testProcessUnparsableRecordWithRawOutputWithCheckpoint()
throws ShutdownException, InvalidStateException {
+ private static Stream<Arguments> unparsableRecordsLists() {
+ final KinesisClientRecord unparsableRecordMock =
mock(KinesisClientRecord.class);
+ return Stream.of(
+ Arguments.argumentSet("Unparsable At The Beginning",
+ Arrays.asList(
+ unparsableRecordMock,
+
KinesisClientRecord.builder().approximateArrivalTimestamp(null)
+ .partitionKey("partition-1")
+ .sequenceNumber("1")
+
.data(ByteBuffer.wrap("{\"record\":\"1\"}".getBytes(StandardCharsets.UTF_8)))
Review Comment:
It looks the data content for each of these examples could be declared
statically and reused.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ return;
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (final FlowFileCompletionException e) {
+ if
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException(e.getMessage(),
e);
+ }
+ final boolean removeCurrentStateFlowFileIfAvailable = true;
+ final KinesisClientRecord kinesisRecord =
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+ final byte[] data = getData(kinesisRecord);
+ outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable,
flowFiles, session, data, kinesisRecord, e);
+ } finally {
+ currentFlowFileState = null;
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord,
final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
- final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
- if (dataBuffer != null) {
- dataBuffer.get(data);
+ if (currentFlowFileState != null &&
!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
}
- FlowFile flowFile = null;
+ final byte[] data = getData(kinesisRecord);
+
try (final InputStream in = new ByteArrayInputStream(data);
final RecordReader reader =
readerFactory.createRecordReader(schemaRetrievalVariables, in, data.length,
getLogger())
) {
Record intermediateRecord;
final PushBackRecordSet recordSet = new
PushBackRecordSet(reader.createRecordSet());
while ((intermediateRecord = recordSet.next()) != null) {
- Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
- if (flowFiles.isEmpty()) {
- flowFile = session.create();
- flowFiles.add(flowFile);
-
- // initialize the writer when the first record is read.
- createWriter(flowFile, session, outputRecord);
+ final Record outputRecord =
recordConverter.convert(intermediateRecord, kinesisRecord, getStreamName(),
getKinesisShardId());
+ if (currentFlowFileState == null) {
+ // writer schema is determined by some, usually the first
record
+ currentFlowFileState = initializeState(session,
outputRecord, null);
+ flowFiles.add(currentFlowFileState.flowFile);
}
- final WriteResult writeResult = writer.write(outputRecord);
- recordCount += writeResult.getRecordCount();
-
- // complete the FlowFile if there are no more incoming Kinesis
Records and no more records in this RecordSet
- if (lastRecord && !recordSet.isAnotherRecord()) {
- completeFlowFile(flowFiles, session, recordCount,
writeResult, kinesisRecord, stopWatch);
+ if
(!DataTypeUtils.isRecordTypeCompatible(currentFlowFileState.recordSchema,
outputRecord, false)) {
Review Comment:
Although the explicit check is an option, is there a reason for this
approach as opposed to catching exceptions when the type is not compatible?
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -128,62 +166,88 @@ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kin
}
}
- private void createWriter(final FlowFile flowFile, final ProcessSession
session, final Record outputRecord)
- throws IOException, SchemaNotFoundException {
-
- final RecordSchema readerSchema = outputRecord.getSchema();
- final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
- outputStream = session.write(flowFile);
- writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
- writer.beginRecordSet();
+ private static byte @NotNull [] getData(final KinesisClientRecord
kinesisRecord) {
Review Comment:
```suggestion
private static byte [] getData(final KinesisClientRecord kinesisRecord) {
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
Review Comment:
The `flowFiles` argument is not included as a parameter in this warning log
message. Recommend logging the specified FlowFile and just the number of others.
However, can this occur? Instead of logging a warning, should an
`IllegalStateException` be thrown instead?
```suggestion
getLogger().warn("{} is not available in provided FlowFiles
[{}]", currentFlowFileState.flowFile, flowFiles.size());
```
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -34,10 +34,14 @@
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.apache.nifi.util.StopWatch;
+import org.jetbrains.annotations.NotNull;
Review Comment:
The `NotNull` annotation from JetBrains should generally be avoided given
the lack of the explicit dependency.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +79,85 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (currentFlowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(currentFlowFileState, "FlowFile State");
+ currentFlowFileState = null;
+ }
+ }
+
+ @Override
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (currentFlowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
+ closeSafe(currentFlowFileState, "FlowFile State");
+ return;
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (final FlowFileCompletionException e) {
+ if
(!currentFlowFileState.containsDataFromExactlyOneKinesisRecord()) {
+ throw new KinesisBatchUnrecoverableException(e.getMessage(),
e);
+ }
+ final boolean removeCurrentStateFlowFileIfAvailable = true;
+ final KinesisClientRecord kinesisRecord =
currentFlowFileState.lastSuccessfulWriteInfo.kinesisRecord;
+ final byte[] data = getData(kinesisRecord);
+ outputRawRecordOnException(removeCurrentStateFlowFileIfAvailable,
flowFiles, session, data, kinesisRecord, e);
+ } finally {
+ currentFlowFileState = null;
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord,
final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
- final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
- if (dataBuffer != null) {
- dataBuffer.get(data);
+ if (currentFlowFileState != null &&
!flowFiles.contains(currentFlowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available, this is not expected.", flowFiles);
Review Comment:
See note above on similar behavior.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -197,7 +261,64 @@ private void outputRawRecordOnException(final boolean
firstOutputRecord, final F
private Map<String, String> getDefaultAttributes(final KinesisClientRecord
kinesisRecord) {
final String partitionKey = kinesisRecord.partitionKey();
final String sequenceNumber = kinesisRecord.sequenceNumber();
+ final long subSequenceNumber = kinesisRecord.subSequenceNumber();
final Instant approximateArrivalTimestamp =
kinesisRecord.approximateArrivalTimestamp();
- return getDefaultAttributes(sequenceNumber, partitionKey,
approximateArrivalTimestamp);
+ return getDefaultAttributes(sequenceNumber, subSequenceNumber,
partitionKey, approximateArrivalTimestamp);
+ }
+
+ void closeSafe(final Closeable closeable, final String closeableName) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ getLogger().warn("Failed to close {} due to {}",
closeableName, e.getLocalizedMessage(), e);
Review Comment:
It is not necessary to repeat the exception message since it is included in
the stack trace of the cause.
```suggestion
getLogger().warn("Failed to close {}", closeableName, e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]