mattyb149 commented on code in PR #6907: URL: https://github.com/apache/nifi/pull/6907#discussion_r1114665037
########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java: ########## @@ -44,35 +51,114 @@ protected void writeJson(T event) throws IOException { } protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) { - return new HashMap<String, String>() { - { - put(SEQUENCE_ID_KEY, Long.toString(sequenceId)); - put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType()); - String gtidSet = eventInfo.getBinlogGtidSet(); - if (gtidSet == null) { - put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename()); - put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition())); - } else { - put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet); - } - put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); - } - }; + final Map<String, String> commonAttributeMap = new HashMap<>(); + + commonAttributeMap.put(SEQUENCE_ID_KEY, Long.toString(sequenceId)); + commonAttributeMap.put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType()); + String gtidSet = eventInfo.getBinlogGtidSet(); + if (gtidSet == null) { + commonAttributeMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename()); + commonAttributeMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition())); + } else { + commonAttributeMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet); + } + commonAttributeMap.put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + + return commonAttributeMap; } // Default implementation for binlog events @Override - public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, (outputStream) -> { + public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship, + final EventWriterConfiguration eventWriterConfiguration) { + configureEventWriter(eventWriterConfiguration, session, eventInfo); + + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); + try { super.startJson(outputStream, eventInfo); writeJson(eventInfo); // Nothing in the body super.endJson(); - }); - flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); - session.transfer(flowFile, relationship); - session.getProvenanceReporter().receive(flowFile, transitUri); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON start array failed", ioe); + } + + eventWriterConfiguration.incrementNumberOfEventsWritten(); + + // Check if it is time to finish the FlowFile + if (nEventsPerFlowFile(eventWriterConfiguration) + && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) { + finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship); + } return currentSequenceId + 1; } + + public void finishAndTransferFlowFile(final ProcessSession session, final EventWriterConfiguration eventWriterConfiguration, final String transitUri, final long seqId, + final BinlogEventInfo eventInfo, final Relationship relationship) { + if (writtenMultipleEvents(eventWriterConfiguration)) { + try { + jsonGenerator.writeEndArray(); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON end array failed", ioe); + } + } + try { + endFile(); + + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (session == null || flowFile == null) { + throw new ProcessException("No open FlowFile or ProcessSession to write to"); + } + flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId, eventInfo)); + session.transfer(flowFile, relationship); + session.getProvenanceReporter().receive(flowFile, transitUri); + + eventWriterConfiguration.cleanUp(); + } catch (IOException ioe) { + throw new FlowFileAccessException("Failed to close event writer", ioe); + } + } + + protected void configureEventWriter(final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final EventInfo eventInfo) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile == null) { + flowFile = session.create(); + OutputStream flowFileOutputStream = session.write(flowFile); + if (eventWriterConfiguration.getJsonGenerator() == null) { + try { + jsonGenerator = createJsonGenerator(flowFileOutputStream); + } catch (IOException ioe) { + throw new UncheckedIOException("JSON Generator creation failed", ioe); + } + } + if (multipleEventsPerFlowFile(eventWriterConfiguration)) { + try { + jsonGenerator.writeStartArray(); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON start array failed", ioe); + } + } + eventWriterConfiguration.startNewFlowFile(flowFile, flowFileOutputStream, jsonGenerator); + } + jsonGenerator = eventWriterConfiguration.getJsonGenerator(); + } + + private boolean multipleEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) { + return (nEventsPerFlowFile(eventWriterConfiguration) + && eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1) + || oneTransactionPerFlowFile(eventWriterConfiguration); + } + + private boolean writtenMultipleEvents(EventWriterConfiguration eventWriterConfiguration) { + return eventWriterConfiguration.getNumberOfEventsWritten() > 1 + || oneTransactionPerFlowFile(eventWriterConfiguration); + } + + protected boolean nEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) { Review Comment: Good catch! Will fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org