tpalfy commented on code in PR #6907: URL: https://github.com/apache/nifi/pull/6907#discussion_r1100221116
########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -185,6 +194,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { SSLMode.VERIFY_IDENTITY.toString(), "Connect with TLS or fail when server support not enabled. Verify server hostname matches presented X.509 certificate names or fail when not matched"); + private static final AllowableValue N_EVENTS_PER_FLOWFILE_STRATEGY = new AllowableValue(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.name(), "N Events Per FlowFile", + "This strategy causes the number of events specified in the Events per FlowFile each binlog event to be written to its own FlowFile"); Review Comment: ```suggestion "This strategy causes the number of events specified in the 'Number of Events Per FlowFile' property to be written per FlowFile"); ``` Btw, don't we want to let the processor to write _fewer_ events than specified (if only that much is available during a single `onTrigger` run)? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java: ########## @@ -44,7 +49,7 @@ protected void writeJson(T event) throws IOException { } protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) { - return new HashMap<String, String>() { + return new HashMap<>() { Review Comment: This change doesn't work on Java 8. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -1017,15 +1096,19 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce CommitTransactionEventInfo commitTransactionEvent = useGtid ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = commitTransactionEvent; + currentEventWriter = commitEventWriter; + currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); Review Comment: We are in a block that only runs when the emission of BEGIN and COMMIT is requested by the user (on the processor). That means that this does NOT run when it is not requested, and in that case the `commitEventWriter.writeEvent` wont run and thus the flowfile will not be sent sout. In short, it doesn't work when the user wants ONE_TRANSACTION_PER_FLOWFILE but doesn't want BEGIN and COMMIT events. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -967,13 +1026,19 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce CommitTransactionEventInfo commitTransactionEvent = useGtid ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = commitTransactionEvent; + currentEventWriter = commitEventWriter; + currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); } + //update inTransaction value to state inTransaction = false; updateState(session); - // Commit the NiFi session - session.commitAsync(); + // If there is no FlowFile open, commit the session + if (eventWriterConfiguration.getCurrentFlowFile() == null) { + // Commit the NiFi session + session.commitAsync(); Review Comment: The process session is committed only after a transaction which is somewhat desirable. But because of this the N events per flowfile can hold back a flowfile with N events already in it until a commit comes. But the commit may never actually come. Consider the following case: - N=5 - 1st transaction inserts 4 files - 2nd transaction inserts 2 files. - When the event for the first insert of the 2nd transaction is received we finish up the current flowfile (has 4 events from the 1st transaction and 1 event from the 2nd transaction). However we don't commit the process session - we are not handling a commit event! - so the flowfile is held back. - When the commit for the 2nd transaction is received we have only 1 event in the flowfile and it is still worked on (not null). So we don't commit the process session. - A bunch of flowfiles can queue up this way until we reach a point where the commit comes when the flowfile has exactly N events or when we stop the processor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org