awelless commented on code in PR #10053:
URL: https://github.com/apache/nifi/pull/10053#discussion_r2185390712
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -197,7 +278,75 @@ private void outputRawRecordOnException(final boolean
firstOutputRecord, final F
private Map<String, String> getDefaultAttributes(final KinesisClientRecord
kinesisRecord) {
final String partitionKey = kinesisRecord.partitionKey();
final String sequenceNumber = kinesisRecord.sequenceNumber();
+ final long subSequenceNumber = kinesisRecord.subSequenceNumber();
final Instant approximateArrivalTimestamp =
kinesisRecord.approximateArrivalTimestamp();
- return getDefaultAttributes(sequenceNumber, partitionKey,
approximateArrivalTimestamp);
+ return getDefaultAttributes(sequenceNumber, subSequenceNumber,
partitionKey, approximateArrivalTimestamp);
+ }
+
+ void closeSafe(final Closeable closeable, final String closeableName) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ getLogger().warn("Failed to close {} due to {}",
closeableName, e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ private record SuccessfulWriteInfo(KinesisClientRecord kinesisRecord,
WriteResult writeResult) { }
+
+ private class FlowFileState implements Closeable {
Review Comment:
Nit: will `SchematizedFlowFile` or `FlowFileWithSchema` better represent the
idea behind this class?
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +82,93 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (flowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(flowFileState, "FlowFile State");
+ flowFileState = null;
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (flowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(flowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
+ closeSafe(flowFileState, "FlowFile State");
+ return;
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (CompleteFlowFileSingleKinesisRecordException e) {
+ final boolean removeFirstFlowFileIfAvailable = true;
+ final KinesisClientRecord kinesisRecord = e.kinesisClientRecord;
+ final byte[] data = getData(kinesisRecord);
+ outputRawRecordOnException(removeFirstFlowFileIfAvailable,
flowFiles, session, data, kinesisRecord, e);
+ } finally {
+ flowFileState = null;
+ failedKinesisRecordCausingDataLoss = null;
+ failedKinesisRecordCausingDataLossException = null;
+ }
+ }
+
+ @Override
+ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord,
final ProcessSession session, final StopWatch
stopWatch) {
- boolean firstOutputRecord = true;
- int recordCount = 0;
- final ByteBuffer dataBuffer = kinesisRecord.data();
- byte[] data = dataBuffer != null ? new byte[dataBuffer.remaining()] :
new byte[0];
- if (dataBuffer != null) {
- dataBuffer.get(data);
+ if (flowFileState != null &&
!flowFiles.contains(flowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available, this is not expected.", flowFiles);
+ closeSafe(flowFileState, "FlowFile State");
+ flowFileState = null;
}
+ if (kinesisRecord == failedKinesisRecordCausingDataLoss) {
+ // AbstractKinesisRecordProcessor does retry processing of failed
records. however in case of CompleteFlowFileMultipleKinesisRecordException it
is impossible to determine the state of
+ // the affected FlowFile and replay all records that were in it.
To prevent data loss, the exception needs to be rethrown until it is given up
by the abstract processor. still there may be
+ // other cases where we end up in undetermined state.
+ throw failedKinesisRecordCausingDataLossException;
Review Comment:
Would it be possible to adjust the retrying logic instead? By telling apart
retriable and persistent errors we can avoid doing unnecessary retries in
`AbstractKinesisRecordProcessor`.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -128,62 +177,94 @@ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kin
}
}
- private void createWriter(final FlowFile flowFile, final ProcessSession
session, final Record outputRecord)
- throws IOException, SchemaNotFoundException {
-
- final RecordSchema readerSchema = outputRecord.getSchema();
- final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
- outputStream = session.write(flowFile);
- writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
- writer.beginRecordSet();
+ private static byte @NotNull [] getData(final KinesisClientRecord
kinesisRecord) {
+ final ByteBuffer dataBuffer = kinesisRecord.data();
+ final byte[] data = dataBuffer != null ? new
byte[dataBuffer.remaining()] : new byte[0];
+ if (dataBuffer != null) {
+ dataBuffer.get(data);
+ }
+ return data;
}
- private void completeFlowFile(final List<FlowFile> flowFiles, final
ProcessSession session, final int recordCount,
- final WriteResult writeResult, final
KinesisClientRecord lastRecord, final StopWatch stopWatch)
- throws IOException {
-
+ /**
+ * Initializes the FlowFile state for the current processing. This
includes creating a new FlowFile, initializing the RecordSetWriter, and setting
up the output stream.
+ * In case of an exception during initialization, the FlowFile is removed
from the session and the resources are closed properly.
+ */
+ private FlowFileState initializeState(final ProcessSession session, final
Record outputRecord, final FlowFileState previousFlowFileState) throws
IOException, SchemaNotFoundException {
+ FlowFile flowFile = null;
+ OutputStream outputStream = null;
+ RecordSetWriter writer = null;
try {
- writer.finishRecordSet();
- } catch (IOException e) {
- getLogger().error("Failed to finish record output due to {}",
e.getLocalizedMessage(), e);
- session.remove(flowFiles.get(0));
- flowFiles.remove(0);
- throw e;
- } finally {
- try {
- writer.close();
- outputStream.close();
- } catch (final IOException e) {
- getLogger().warn("Failed to close Record Writer due to {}",
e.getLocalizedMessage(), e);
+ flowFile = session.create();
+ // If we have a previous schema, we need to merge it with the
current schema to ensure that the writer can handle all fields from both
schemas going forward.
+ // There is an assumption here that all the records are expected
by the downstream to have the same-ish schema. It is possible to imagine a
scenario where records with different schemas
+ // should be written to separate FlowFiles.
+ final RecordSchema newReadSchema =
Optional.ofNullable(previousFlowFileState).map(it ->
+ DataTypeUtils.merge(it.recordSchema,
outputRecord.getSchema())).orElse(outputRecord.getSchema());
+
+ final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, newReadSchema);
+ outputStream = session.write(flowFile);
+ writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
+ writer.beginRecordSet();
+ } catch (final Exception e) {
+ if (flowFile != null) {
+ session.remove(flowFile);
}
+ closeSafe(writer, "Record Writer");
+ closeSafe(outputStream, "Output Stream");
+ throw e;
}
+ return new FlowFileState(flowFile, writer, outputStream,
outputRecord.getSchema());
+ }
- reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
-
- final Map<String, String> attributes =
getDefaultAttributes(lastRecord);
- attributes.put("record.count", String.valueOf(recordCount));
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- flowFiles.set(0, session.putAllAttributes(flowFiles.get(0),
attributes));
+ private void completeFlowFile(final List<FlowFile> flowFiles, final
ProcessSession session, final StopWatch stopWatch)
+ throws CompleteFlowFileSingleKinesisRecordException {
+ if (flowFileState.isFlowFileEmpty()) {
+ flowFiles.remove(flowFileState.flowFile);
+ session.remove(flowFileState.flowFile);
+ closeSafe(flowFileState, "FlowFile State");
+ return;
+ }
+ try {
+ flowFileState.writer.finishRecordSet();
+ closeSafe(flowFileState, "FlowFile State");
+ reportProvenance(session, flowFileState.flowFile, null, null,
stopWatch);
- writer = null;
- outputStream = null;
+ final Map<String, String> attributes =
getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+ attributes.put("record.count",
String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
flowFileState.writer.getMimeType());
+
attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
+ final int flowFileIndex =
flowFiles.indexOf(flowFileState.flowFile);
+ flowFiles.set(flowFileIndex,
session.putAllAttributes(flowFileState.flowFile, attributes));
+ } catch (final IOException e) {
+ flowFiles.remove(flowFileState.flowFile);
+ session.remove(flowFileState.flowFile);
+ closeSafe(flowFileState, "FlowFile State");
+ final String message = "Failed to complete a FlowFile containing
records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range:
[%s/%d, %s/%d)".formatted(
+ getStreamName(),
+ getKinesisShardId(),
+
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(),
+
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber()
+ );
+ final boolean isSingleKinesisRecordInFlowFile =
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.equals(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+ if (isSingleKinesisRecordInFlowFile) {
+ // in case of a single Kinesis Record in FlowFile, so we can
route it to failure relationship
+ throw new
CompleteFlowFileSingleKinesisRecordException(message, e,
flowFileState.firstSuccessfulWriteInfo.kinesisRecord);
+ }
+ // in case of multiple Kinesis Records the whole batch needs to be
failed, otherwise data can be lost
+ throw new CompleteFlowFileMultipleKinesisRecordException(message,
e);
Review Comment:
For me different exception types seem counterintuitive.
Should we better do this check in the error handling piece of code?
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -47,16 +51,19 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class KinesisRecordProcessorRecord extends
AbstractKinesisRecordProcessor {
final RecordReaderFactory readerFactory;
final RecordSetWriterFactory writerFactory;
final Map<String, String> schemaRetrievalVariables;
- private RecordSetWriter writer;
- private OutputStream outputStream;
+ private FlowFileState flowFileState;
Review Comment:
Nit: I reckon `(current|open)FlowFile` name will better convey the meaning
of this field.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -128,62 +177,94 @@ void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kin
}
}
- private void createWriter(final FlowFile flowFile, final ProcessSession
session, final Record outputRecord)
- throws IOException, SchemaNotFoundException {
-
- final RecordSchema readerSchema = outputRecord.getSchema();
- final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
- outputStream = session.write(flowFile);
- writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
- writer.beginRecordSet();
+ private static byte @NotNull [] getData(final KinesisClientRecord
kinesisRecord) {
+ final ByteBuffer dataBuffer = kinesisRecord.data();
+ final byte[] data = dataBuffer != null ? new
byte[dataBuffer.remaining()] : new byte[0];
+ if (dataBuffer != null) {
+ dataBuffer.get(data);
+ }
+ return data;
}
- private void completeFlowFile(final List<FlowFile> flowFiles, final
ProcessSession session, final int recordCount,
- final WriteResult writeResult, final
KinesisClientRecord lastRecord, final StopWatch stopWatch)
- throws IOException {
-
+ /**
+ * Initializes the FlowFile state for the current processing. This
includes creating a new FlowFile, initializing the RecordSetWriter, and setting
up the output stream.
+ * In case of an exception during initialization, the FlowFile is removed
from the session and the resources are closed properly.
+ */
+ private FlowFileState initializeState(final ProcessSession session, final
Record outputRecord, final FlowFileState previousFlowFileState) throws
IOException, SchemaNotFoundException {
+ FlowFile flowFile = null;
+ OutputStream outputStream = null;
+ RecordSetWriter writer = null;
try {
- writer.finishRecordSet();
- } catch (IOException e) {
- getLogger().error("Failed to finish record output due to {}",
e.getLocalizedMessage(), e);
- session.remove(flowFiles.get(0));
- flowFiles.remove(0);
- throw e;
- } finally {
- try {
- writer.close();
- outputStream.close();
- } catch (final IOException e) {
- getLogger().warn("Failed to close Record Writer due to {}",
e.getLocalizedMessage(), e);
+ flowFile = session.create();
+ // If we have a previous schema, we need to merge it with the
current schema to ensure that the writer can handle all fields from both
schemas going forward.
+ // There is an assumption here that all the records are expected
by the downstream to have the same-ish schema. It is possible to imagine a
scenario where records with different schemas
+ // should be written to separate FlowFiles.
+ final RecordSchema newReadSchema =
Optional.ofNullable(previousFlowFileState).map(it ->
+ DataTypeUtils.merge(it.recordSchema,
outputRecord.getSchema())).orElse(outputRecord.getSchema());
+
+ final RecordSchema writeSchema =
writerFactory.getSchema(schemaRetrievalVariables, newReadSchema);
+ outputStream = session.write(flowFile);
+ writer = writerFactory.createWriter(getLogger(), writeSchema,
outputStream, flowFile);
+ writer.beginRecordSet();
+ } catch (final Exception e) {
+ if (flowFile != null) {
+ session.remove(flowFile);
}
+ closeSafe(writer, "Record Writer");
+ closeSafe(outputStream, "Output Stream");
+ throw e;
}
+ return new FlowFileState(flowFile, writer, outputStream,
outputRecord.getSchema());
+ }
- reportProvenance(session, flowFiles.get(0), null, null, stopWatch);
-
- final Map<String, String> attributes =
getDefaultAttributes(lastRecord);
- attributes.put("record.count", String.valueOf(recordCount));
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- flowFiles.set(0, session.putAllAttributes(flowFiles.get(0),
attributes));
+ private void completeFlowFile(final List<FlowFile> flowFiles, final
ProcessSession session, final StopWatch stopWatch)
+ throws CompleteFlowFileSingleKinesisRecordException {
+ if (flowFileState.isFlowFileEmpty()) {
+ flowFiles.remove(flowFileState.flowFile);
+ session.remove(flowFileState.flowFile);
+ closeSafe(flowFileState, "FlowFile State");
+ return;
+ }
+ try {
+ flowFileState.writer.finishRecordSet();
+ closeSafe(flowFileState, "FlowFile State");
+ reportProvenance(session, flowFileState.flowFile, null, null,
stopWatch);
- writer = null;
- outputStream = null;
+ final Map<String, String> attributes =
getDefaultAttributes(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
+ attributes.put("record.count",
String.valueOf(flowFileState.lastSuccessfulWriteInfo.writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
flowFileState.writer.getMimeType());
+
attributes.putAll(flowFileState.lastSuccessfulWriteInfo.writeResult.getAttributes());
+ final int flowFileIndex =
flowFiles.indexOf(flowFileState.flowFile);
+ flowFiles.set(flowFileIndex,
session.putAllAttributes(flowFileState.flowFile, attributes));
+ } catch (final IOException e) {
+ flowFiles.remove(flowFileState.flowFile);
+ session.remove(flowFileState.flowFile);
+ closeSafe(flowFileState, "FlowFile State");
+ final String message = "Failed to complete a FlowFile containing
records from Stream Name: %s, Shard Id: %s, Sequence/Subsequence No range:
[%s/%d, %s/%d)".formatted(
+ getStreamName(),
+ getKinesisShardId(),
+
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.subSequenceNumber(),
+
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.sequenceNumber(),
+
flowFileState.lastSuccessfulWriteInfo.kinesisRecord.subSequenceNumber()
+ );
+ final boolean isSingleKinesisRecordInFlowFile =
flowFileState.firstSuccessfulWriteInfo.kinesisRecord.equals(flowFileState.lastSuccessfulWriteInfo.kinesisRecord);
Review Comment:
Nit: let's incapsulate the logic of determining how many records there are
in the current flow file into `FlowFileState` class.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/record/KinesisRecordProcessorRecord.java:
##########
@@ -75,51 +82,93 @@ public KinesisRecordProcessorRecord(final
ProcessSessionFactory sessionFactory,
@Override
void startProcessingRecords() {
super.startProcessingRecords();
- outputStream = null;
- writer = null;
+ if (flowFileState != null) {
+ getLogger().warn("FlowFile State is not null at the start of
processing records, this is not expected.");
+ closeSafe(flowFileState, "FlowFile State");
+ flowFileState = null;
+ }
}
@Override
- void processRecord(final List<FlowFile> flowFiles, final
KinesisClientRecord kinesisRecord, final boolean lastRecord,
+ void finishProcessingRecords(final ProcessSession session, final
List<FlowFile> flowFiles, final StopWatch stopWatch) {
+ super.finishProcessingRecords(session, flowFiles, stopWatch);
+ try {
+ if (flowFileState == null) {
+ return;
+ }
+ if (!flowFiles.contains(flowFileState.flowFile)) {
+ getLogger().warn("Currently processed FlowFile is no longer
available at processing end, this is not expected.", flowFiles);
+ closeSafe(flowFileState, "FlowFile State");
+ return;
+ }
+ completeFlowFile(flowFiles, session, stopWatch);
+ } catch (CompleteFlowFileSingleKinesisRecordException e) {
+ final boolean removeFirstFlowFileIfAvailable = true;
Review Comment:
Nit: that's "current" file now, not "first"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]