mark-bathori commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1101506026


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);
+                }
+            }
+            if 
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                    && eventWriterConfiguration.getNumberOfEventsPerFlowFile() 
> 1)
+                    || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't write start of 
event array", ioe);
+                }
+            }
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't write start of event 
array", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if 
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                && eventWriterConfiguration.getNumberOfEventsWritten() == 
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            flowFile = finishAndTransferFlowFile(eventWriterConfiguration, 
transitUri, currentSequenceId, eventInfo, relationship);
+        }
+        eventWriterConfiguration.setCurrentFlowFile(flowFile);
+        return currentSequenceId + 1;
+    }
+
+    public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration 
eventWriterConfiguration, final String transitUri, final long seqId,
+                                              final BinlogEventInfo eventInfo, 
final Relationship relationship) {
+        // If writing multiple events, end the array
+        if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new FlowFileAccessException("Couldn't write end of event 
array", ioe);
+            }
+        }
+        try {
+            endFile();
+            eventWriterConfiguration.setJsonGenerator(null);
+            eventWriterConfiguration.getFlowFileOutputStream().close();
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't flush and close file", 
ioe);
+        }
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        ProcessSession session = eventWriterConfiguration.getWorkingSession();
+        if (session == null && flowFile == null) {

Review Comment:
   I think the `&&` operator should be `||` here since if either condition is 
false we will got NullPointerException in the next line. The exception's 
description also assumes 'OR' relation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to