This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 3321de01f2 NIFI-11305: Fixed out-of-order handling of binlog client 
shutdown in CaptureChangeMySQL (#7068)
3321de01f2 is described below

commit 3321de01f210aaadf9f6a3c9628b8e99a6fa72cf
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Mar 22 09:47:27 2023 -0400

    NIFI-11305: Fixed out-of-order handling of binlog client shutdown in 
CaptureChangeMySQL (#7068)
    
    NIFI-11305: Fixed out-of-order handling of binlog client shutdown in 
CaptureChangeMySQL
    - Changed to allow 'unexpected' events to still be processed
---
 .../nifi/cdc/mysql/event/BinlogEventListener.java  |  8 +++--
 .../cdc/mysql/processors/CaptureChangeMySQL.java   | 36 +++++++++++++++++-----
 2 files changed, 34 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
index c0f5800f67..48f35d3e65 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
@@ -18,6 +18,8 @@ package org.apache.nifi.cdc.mysql.event;
 
 import com.github.shyiko.mysql.binlog.BinaryLogClient;
 import com.github.shyiko.mysql.binlog.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class BinlogEventListener implements BinaryLogClient.EventListener {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(BinlogEventListener.class);
+
     private final AtomicBoolean stopNow = new AtomicBoolean(false);
     private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
 
@@ -57,9 +61,9 @@ public class BinlogEventListener implements 
BinaryLogClient.EventListener {
                 }
             }
 
-            throw new RuntimeException("Stopped while waiting to enqueue 
event");
+            logger.info("Stopped while waiting to enqueue event");
         } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while adding event to the 
queue");
+            logger.warn("Interrupted while adding event to the queue", e);
         }
     }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 4298af4379..76ada9b306 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -719,6 +719,12 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
             connect(hosts, username, password, serverId, 
createEnrichmentConnection, driverLocation, driverName, connectTimeout, 
sslContextService, sslMode);
         } catch (IOException | IllegalStateException e) {
+            if (eventListener != null) {
+                eventListener.stop();
+                if (binlogClient != null) {
+                    binlogClient.unregisterEventListener(eventListener);
+                }
+            }
             context.yield();
             binlogClient = null;
             throw new ProcessException(e.getMessage(), e);
@@ -901,6 +907,12 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             }
         }
         if (!binlogClient.isConnected()) {
+            if (eventListener != null) {
+                eventListener.stop();
+                if (binlogClient != null) {
+                    binlogClient.unregisterEventListener(eventListener);
+                }
+            }
             binlogClient.disconnect();
             binlogClient = null;
             throw new IOException("Could not connect binlog client to any of 
the specified hosts due to: " + lastConnectException.getMessage(), 
lastConnectException);
@@ -915,9 +927,18 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                 // Ensure connection can be created.
                 getJdbcConnection();
             } catch (SQLException e) {
-                binlogClient.disconnect();
-                binlogClient = null;
-                throw new IOException("Error creating binlog enrichment JDBC 
connection to any of the specified hosts", e);
+                getLogger().error("Error creating binlog enrichment JDBC 
connection to any of the specified hosts", e);
+                if (eventListener != null) {
+                    eventListener.stop();
+                    if (binlogClient != null) {
+                        binlogClient.unregisterEventListener(eventListener);
+                    }
+                }
+                if (binlogClient != null) {
+                    binlogClient.disconnect();
+                    binlogClient = null;
+                }
+                return;
             }
         }
 
@@ -1151,8 +1172,7 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                     }
                     if (!inTransaction) {
                         // These events should only happen inside a 
transaction, warn the user otherwise
-                        log.warn("Table modification event occurred outside of 
a transaction.");
-                        break;
+                        log.info("Event {} occurred outside of a transaction, 
which is unexpected.", eventType.name());
                     }
                     if (currentTable == null && cacheClient != null) {
                         // No Table Map event was processed prior to this 
event, which should not happen, so throw an error
@@ -1245,15 +1265,15 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     protected void stop() throws CDCException {
         try {
-            if (binlogClient != null) {
-                binlogClient.disconnect();
-            }
             if (eventListener != null) {
                 eventListener.stop();
                 if (binlogClient != null) {
                     binlogClient.unregisterEventListener(eventListener);
                 }
             }
+            if (binlogClient != null) {
+                binlogClient.disconnect();
+            }
 
             if (currentSession != null) {
                 FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();

Reply via email to