mark-bathori commented on code in PR #6907: URL: https://github.com/apache/nifi/pull/6907#discussion_r1101506026
########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java: ########## @@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogE // Default implementation for binlog events @Override - public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, (outputStream) -> { + public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship, + final EventWriterConfiguration eventWriterConfiguration) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile == null) { + flowFile = session.create(); + OutputStream flowFileOutputStream = session.write(flowFile); + eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream); + eventWriterConfiguration.setCurrentFlowFile(flowFile); + if (eventWriterConfiguration.getJsonGenerator() == null) { + try { + jsonGenerator = createJsonGenerator(flowFileOutputStream); + eventWriterConfiguration.setJsonGenerator(jsonGenerator); + } catch (IOException ioe) { + throw new FlowFileAccessException("Couldn't create JSON generator", ioe); + } + } + if ((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()) + && eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1) + || FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) { + try { + jsonGenerator.writeStartArray(); + } catch (IOException ioe) { + throw new FlowFileAccessException("Couldn't write start of event array", ioe); + } + } + } + jsonGenerator = eventWriterConfiguration.getJsonGenerator(); + + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); + try { super.startJson(outputStream, eventInfo); writeJson(eventInfo); // Nothing in the body super.endJson(); - }); - flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); + } catch (IOException ioe) { + throw new FlowFileAccessException("Couldn't write start of event array", ioe); + } + + eventWriterConfiguration.incrementNumberOfEventsWritten(); + + // Check if it is time to finish the FlowFile + if (FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()) + && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) { + flowFile = finishAndTransferFlowFile(eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship); + } + eventWriterConfiguration.setCurrentFlowFile(flowFile); + return currentSequenceId + 1; + } + + public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration eventWriterConfiguration, final String transitUri, final long seqId, + final BinlogEventInfo eventInfo, final Relationship relationship) { + // If writing multiple events, end the array + if (eventWriterConfiguration.getNumberOfEventsWritten() > 1 + || FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) { + try { + jsonGenerator.writeEndArray(); + } catch (IOException ioe) { + throw new FlowFileAccessException("Couldn't write end of event array", ioe); + } + } + try { + endFile(); + eventWriterConfiguration.setJsonGenerator(null); + eventWriterConfiguration.getFlowFileOutputStream().close(); + } catch (IOException ioe) { + throw new FlowFileAccessException("Couldn't flush and close file", ioe); + } + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + ProcessSession session = eventWriterConfiguration.getWorkingSession(); + if (session == null && flowFile == null) { Review Comment: I think the `&&` operator should be `||` here since if either condition is false we will got NullPointerException in the next line. The exception's description also assumes 'OR' relation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org