exceptionfactory commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1101893920


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -44,7 +49,7 @@ protected void writeJson(T event) throws IOException {
     }
 
     protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {
-        return new HashMap<String, String>() {
+        return new HashMap<>() {
             {
                 put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
                 put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());

Review Comment:
   This change fails to compile on Java 8. This is a good opportunity to 
refactor and avoid the use of anonymous extensions of `HashMap`.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);

Review Comment:
   Recommend avoiding conjunctions in error message wording. Is 
`FlowFileAccessException` the best choice in this scenario? Perhaps an 
`UncheckedIOException` would be better.
   ```suggestion
                       throw new UncheckedIOException("JSON Generator creation 
failed", ioe);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);
+                }
+            }
+            if 
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                    && eventWriterConfiguration.getNumberOfEventsPerFlowFile() 
> 1)
+                    || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't write start of 
event array", ioe);
+                }
+            }
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't write start of event 
array", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if 
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                && eventWriterConfiguration.getNumberOfEventsWritten() == 
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            flowFile = finishAndTransferFlowFile(eventWriterConfiguration, 
transitUri, currentSequenceId, eventInfo, relationship);
+        }
+        eventWriterConfiguration.setCurrentFlowFile(flowFile);
+        return currentSequenceId + 1;
+    }
+
+    public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration 
eventWriterConfiguration, final String transitUri, final long seqId,
+                                              final BinlogEventInfo eventInfo, 
final Relationship relationship) {
+        // If writing multiple events, end the array
+        if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new FlowFileAccessException("Couldn't write end of event 
array", ioe);
+            }
+        }
+        try {
+            endFile();
+            eventWriterConfiguration.setJsonGenerator(null);
+            eventWriterConfiguration.getFlowFileOutputStream().close();
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't flush and close file", 
ioe);

Review Comment:
   ```suggestion
               throw new UncheckedIOException("Failed to close event writer", 
ioe);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);
+                }
+            }
+            if 
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                    && eventWriterConfiguration.getNumberOfEventsPerFlowFile() 
> 1)
+                    || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't write start of 
event array", ioe);
+                }
+            }
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't write start of event 
array", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if 
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                && eventWriterConfiguration.getNumberOfEventsWritten() == 
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            flowFile = finishAndTransferFlowFile(eventWriterConfiguration, 
transitUri, currentSequenceId, eventInfo, relationship);
+        }
+        eventWriterConfiguration.setCurrentFlowFile(flowFile);
+        return currentSequenceId + 1;
+    }
+
+    public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration 
eventWriterConfiguration, final String transitUri, final long seqId,
+                                              final BinlogEventInfo eventInfo, 
final Relationship relationship) {
+        // If writing multiple events, end the array
+        if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new FlowFileAccessException("Couldn't write end of event 
array", ioe);

Review Comment:
   ```suggestion
                   throw new UncheckedIOException("Write JSON start array 
failed", ioe);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java:
##########
@@ -16,28 +16,46 @@
  */
 package org.apache.nifi.cdc.mysql.event.io;
 
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
 
 /**
  * A writer class to output MySQL binlog Data Definition Language (DDL) events 
to flow file(s).
  */
 public class DDLEventWriter extends 
AbstractBinlogTableEventWriter<DDLEventInfo> {
 
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, 
DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, 
DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = configureEventWriter(eventWriterConfiguration, 
session, eventInfo);
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+
+        try {
             super.startJson(outputStream, eventInfo);
             super.writeJson(eventInfo);
             jsonGenerator.writeStringField("query", eventInfo.getQuery());
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
-        session.transfer(flowFile, relationship);
-        session.getProvenanceReporter().receive(flowFile, transitUri);
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't write start of event 
array", ioe);

Review Comment:
   ```suggestion
               throw new UncheckedIOException("Write JSON start array failed", 
ioe);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -269,6 +284,31 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor EVENTS_PER_FLOWFILE_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-events-per-flowfile")
+            .displayName("Events Per FlowFile Strategy")
+            .description("Specifies the strategy to use when writing events to 
FlowFile(s)")
+            .required(true)
+            .sensitive(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(N_EVENTS_PER_FLOWFILE_STRATEGY, 
ONE_TRANSACTION_PER_FLOWFILE_STRATEGY)
+            .defaultValue(N_EVENTS_PER_FLOWFILE_STRATEGY.getValue())
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor NUMBER_OF_EVENTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-number-of-events-per-flowfile")
+            .displayName("Number of Events Per FlowFile")

Review Comment:
   This seems more verbose than necessary, what do you think about shorter 
names?
   ```suggestion
               .name("events-per-flowfile")
               .displayName("Events Per FlowFile")
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1241,6 +1335,23 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) 
throws SQLException {
         return tableInfo;
     }
 
+    protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {
+        return new HashMap<>() {
+            {
+                put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
+                put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
+                String gtidSet = eventInfo.getBinlogGtidSet();
+                if (gtidSet == null) {
+                    put(BinlogEventInfo.BINLOG_FILENAME_KEY, 
eventInfo.getBinlogFilename());
+                    put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(eventInfo.getBinlogPosition()));
+                } else {
+                    put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+                }
+                put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+            }
+        };

Review Comment:
   This does not compile on Java 8. Although the goal is to move to Java 11 
soon, using an anonymous extension of HashMap should be avoided.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -269,6 +284,31 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor EVENTS_PER_FLOWFILE_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-events-per-flowfile")
+            .displayName("Events Per FlowFile Strategy")

Review Comment:
   What do you think about calling this `Event Processing Strategy`?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);
+                }
+            }
+            if 
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                    && eventWriterConfiguration.getNumberOfEventsPerFlowFile() 
> 1)
+                    || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't write start of 
event array", ioe);

Review Comment:
   ```suggestion
                       throw new UncheckedIOException("Write JSON start array 
failed", ioe);
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);
+                }
+            }
+            if 
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                    && eventWriterConfiguration.getNumberOfEventsPerFlowFile() 
> 1)
+                    || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't write start of 
event array", ioe);
+                }
+            }
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't write start of event 
array", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if 
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                && eventWriterConfiguration.getNumberOfEventsWritten() == 
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            flowFile = finishAndTransferFlowFile(eventWriterConfiguration, 
transitUri, currentSequenceId, eventInfo, relationship);
+        }
+        eventWriterConfiguration.setCurrentFlowFile(flowFile);
+        return currentSequenceId + 1;
+    }
+
+    public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration 
eventWriterConfiguration, final String transitUri, final long seqId,
+                                              final BinlogEventInfo eventInfo, 
final Relationship relationship) {
+        // If writing multiple events, end the array
+        if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new FlowFileAccessException("Couldn't write end of event 
array", ioe);
+            }
+        }
+        try {
+            endFile();
+            eventWriterConfiguration.setJsonGenerator(null);
+            eventWriterConfiguration.getFlowFileOutputStream().close();
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't flush and close file", 
ioe);
+        }
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        ProcessSession session = eventWriterConfiguration.getWorkingSession();
+        if (session == null && flowFile == null) {
+            throw new FlowFileAccessException("No open FlowFile or 
ProcessSession to write to");

Review Comment:
   Is `FlowFileAccessException` the best option? Perhaps `ProcessException`?
   ```suggestion
               throw new ProcessException("No open FlowFile or ProcessSession 
found for writing");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to