mattyb149 commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1114665037


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -44,35 +51,114 @@ protected void writeJson(T event) throws IOException {
     }
 
     protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {
-        return new HashMap<String, String>() {
-            {
-                put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
-                put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
-                String gtidSet = eventInfo.getBinlogGtidSet();
-                if (gtidSet == null) {
-                    put(BinlogEventInfo.BINLOG_FILENAME_KEY, 
eventInfo.getBinlogFilename());
-                    put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(eventInfo.getBinlogPosition()));
-                } else {
-                    put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
-                }
-                put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
-            }
-        };
+        final Map<String, String> commonAttributeMap = new HashMap<>();
+
+        commonAttributeMap.put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
+        commonAttributeMap.put(CDC_EVENT_TYPE_ATTRIBUTE, 
eventInfo.getEventType());
+        String gtidSet = eventInfo.getBinlogGtidSet();
+        if (gtidSet == null) {
+            commonAttributeMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, 
eventInfo.getBinlogFilename());
+            commonAttributeMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(eventInfo.getBinlogPosition()));
+        } else {
+            commonAttributeMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
gtidSet);
+        }
+        commonAttributeMap.put(CoreAttributes.MIME_TYPE.key(), 
APPLICATION_JSON);
+
+        return commonAttributeMap;
     }
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        configureEventWriter(eventWriterConfiguration, session, eventInfo);
+
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
-        session.transfer(flowFile, relationship);
-        session.getProvenanceReporter().receive(flowFile, transitUri);
+        } catch (IOException ioe) {
+            throw new UncheckedIOException("Write JSON start array failed", 
ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if (nEventsPerFlowFile(eventWriterConfiguration)
+                && eventWriterConfiguration.getNumberOfEventsWritten() == 
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            finishAndTransferFlowFile(session, eventWriterConfiguration, 
transitUri, currentSequenceId, eventInfo, relationship);
+        }
         return currentSequenceId + 1;
     }
+
+    public void finishAndTransferFlowFile(final ProcessSession session, final 
EventWriterConfiguration eventWriterConfiguration, final String transitUri, 
final long seqId,
+                                          final BinlogEventInfo eventInfo, 
final Relationship relationship) {
+        if (writtenMultipleEvents(eventWriterConfiguration)) {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new UncheckedIOException("Write JSON end array failed", 
ioe);
+            }
+        }
+        try {
+            endFile();
+
+            FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+            if (session == null || flowFile == null) {
+                throw new ProcessException("No open FlowFile or ProcessSession 
to write to");
+            }
+            flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(seqId, eventInfo));
+            session.transfer(flowFile, relationship);
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            eventWriterConfiguration.cleanUp();
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Failed to close event writer", 
ioe);
+        }
+    }
+
+    protected void configureEventWriter(final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final EventInfo 
eventInfo) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                } catch (IOException ioe) {
+                    throw new UncheckedIOException("JSON Generator creation 
failed", ioe);
+                }
+            }
+            if (multipleEventsPerFlowFile(eventWriterConfiguration)) {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new UncheckedIOException("Write JSON start array 
failed", ioe);
+                }
+            }
+            eventWriterConfiguration.startNewFlowFile(flowFile, 
flowFileOutputStream, jsonGenerator);
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+    }
+
+    private boolean multipleEventsPerFlowFile(EventWriterConfiguration 
eventWriterConfiguration) {
+        return (nEventsPerFlowFile(eventWriterConfiguration)
+                && eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1)
+                || oneTransactionPerFlowFile(eventWriterConfiguration);
+    }
+
+    private boolean writtenMultipleEvents(EventWriterConfiguration 
eventWriterConfiguration) {
+        return eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || oneTransactionPerFlowFile(eventWriterConfiguration);
+    }
+
+    protected boolean nEventsPerFlowFile(EventWriterConfiguration 
eventWriterConfiguration) {

Review Comment:
   Good catch! Will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to