tpalfy commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1100221116


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -185,6 +194,12 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             SSLMode.VERIFY_IDENTITY.toString(),
             "Connect with TLS or fail when server support not enabled. Verify 
server hostname matches presented X.509 certificate names or fail when not 
matched");
 
+    private static final AllowableValue N_EVENTS_PER_FLOWFILE_STRATEGY = new 
AllowableValue(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.name(), "N 
Events Per FlowFile",
+            "This strategy causes the number of events specified in the Events 
per FlowFile each binlog event to be written to its own FlowFile");

Review Comment:
   ```suggestion
               "This strategy causes the number of events specified in the 
'Number of Events Per FlowFile' property to be written per FlowFile");
   ```
   
   Btw, don't we want to let the processor to write _fewer_ events than 
specified (if only that much is available during a single `onTrigger` run)?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -44,7 +49,7 @@ protected void writeJson(T event) throws IOException {
     }
 
     protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {
-        return new HashMap<String, String>() {
+        return new HashMap<>() {

Review Comment:
   This change doesn't work on Java 8.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1017,15 +1096,19 @@ public void outputEvents(ProcessSession session, 
ComponentLog log) throws IOExce
                         CommitTransactionEventInfo commitTransactionEvent = 
useGtid
                                 ? new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
                                 : new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                        
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+                        currentEventInfo = commitTransactionEvent;
+                        currentEventWriter = commitEventWriter;
+                        
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, 
eventWriterConfiguration));

Review Comment:
   We are in a block that only runs when the emission of BEGIN and COMMIT is 
requested by the user (on the processor).
   That means that this does NOT run when it is not requested, and in that case 
the `commitEventWriter.writeEvent` wont run and thus the flowfile will not be 
sent sout.
   
   In short, it doesn't work when the user wants ONE_TRANSACTION_PER_FLOWFILE 
but doesn't want BEGIN and COMMIT events. 



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -967,13 +1026,19 @@ public void outputEvents(ProcessSession session, 
ComponentLog log) throws IOExce
                             CommitTransactionEventInfo commitTransactionEvent 
= useGtid
                                     ? new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
                                     : new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                            
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+                            currentEventInfo = commitTransactionEvent;
+                            currentEventWriter = commitEventWriter;
+                            
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, 
eventWriterConfiguration));
                         }
+
                         //update inTransaction value to state
                         inTransaction = false;
                         updateState(session);
-                        // Commit the NiFi session
-                        session.commitAsync();
+                        // If there is no FlowFile open, commit the session
+                        if (eventWriterConfiguration.getCurrentFlowFile() == 
null) {
+                            // Commit the NiFi session
+                            session.commitAsync();

Review Comment:
   The process session is committed only after a transaction which is somewhat 
desirable. But because of this the N events per flowfile can hold back a 
flowfile with N events already in it until a commit comes. But the commit may 
never actually come.
   Consider the following case:
   - N=5
   - 1st transaction inserts 4 files
   - 2nd transaction inserts 2 files.
   - When the event for the first insert of the 2nd transaction is received we 
finish up the current flowfile (has 4 events from the 1st transaction and 1 
event from the 2nd transaction). However we don't commit the process session - 
we are not handling a commit event! - so the flowfile is held back.
   - When the commit for the 2nd transaction is received we have only 1 event 
in the flowfile and it is still worked on (not null). So we don't commit the 
process session.
   - A bunch of flowfiles can queue up this way until we reach a point where 
the commit comes when the flowfile has exactly N events or when we stop the 
processor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to