This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 3321de01f2 NIFI-11305: Fixed out-of-order handling of binlog client
shutdown in CaptureChangeMySQL (#7068)
3321de01f2 is described below
commit 3321de01f210aaadf9f6a3c9628b8e99a6fa72cf
Author: Matt Burgess <[email protected]>
AuthorDate: Wed Mar 22 09:47:27 2023 -0400
NIFI-11305: Fixed out-of-order handling of binlog client shutdown in
CaptureChangeMySQL (#7068)
NIFI-11305: Fixed out-of-order handling of binlog client shutdown in
CaptureChangeMySQL
- Changed to allow 'unexpected' events to still be processed
---
.../nifi/cdc/mysql/event/BinlogEventListener.java | 8 +++--
.../cdc/mysql/processors/CaptureChangeMySQL.java | 36 +++++++++++++++++-----
2 files changed, 34 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
index c0f5800f67..48f35d3e65 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
@@ -18,6 +18,8 @@ package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class BinlogEventListener implements BinaryLogClient.EventListener {
+ private static final Logger logger =
LoggerFactory.getLogger(BinlogEventListener.class);
+
private final AtomicBoolean stopNow = new AtomicBoolean(false);
private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
@@ -57,9 +61,9 @@ public class BinlogEventListener implements
BinaryLogClient.EventListener {
}
}
- throw new RuntimeException("Stopped while waiting to enqueue
event");
+ logger.info("Stopped while waiting to enqueue event");
} catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while adding event to the
queue");
+ logger.warn("Interrupted while adding event to the queue", e);
}
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 4298af4379..76ada9b306 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -719,6 +719,12 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
connect(hosts, username, password, serverId,
createEnrichmentConnection, driverLocation, driverName, connectTimeout,
sslContextService, sslMode);
} catch (IOException | IllegalStateException e) {
+ if (eventListener != null) {
+ eventListener.stop();
+ if (binlogClient != null) {
+ binlogClient.unregisterEventListener(eventListener);
+ }
+ }
context.yield();
binlogClient = null;
throw new ProcessException(e.getMessage(), e);
@@ -901,6 +907,12 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
}
if (!binlogClient.isConnected()) {
+ if (eventListener != null) {
+ eventListener.stop();
+ if (binlogClient != null) {
+ binlogClient.unregisterEventListener(eventListener);
+ }
+ }
binlogClient.disconnect();
binlogClient = null;
throw new IOException("Could not connect binlog client to any of
the specified hosts due to: " + lastConnectException.getMessage(),
lastConnectException);
@@ -915,9 +927,18 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
// Ensure connection can be created.
getJdbcConnection();
} catch (SQLException e) {
- binlogClient.disconnect();
- binlogClient = null;
- throw new IOException("Error creating binlog enrichment JDBC
connection to any of the specified hosts", e);
+ getLogger().error("Error creating binlog enrichment JDBC
connection to any of the specified hosts", e);
+ if (eventListener != null) {
+ eventListener.stop();
+ if (binlogClient != null) {
+ binlogClient.unregisterEventListener(eventListener);
+ }
+ }
+ if (binlogClient != null) {
+ binlogClient.disconnect();
+ binlogClient = null;
+ }
+ return;
}
}
@@ -1151,8 +1172,7 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
if (!inTransaction) {
// These events should only happen inside a
transaction, warn the user otherwise
- log.warn("Table modification event occurred outside of
a transaction.");
- break;
+ log.info("Event {} occurred outside of a transaction,
which is unexpected.", eventType.name());
}
if (currentTable == null && cacheClient != null) {
// No Table Map event was processed prior to this
event, which should not happen, so throw an error
@@ -1245,15 +1265,15 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
protected void stop() throws CDCException {
try {
- if (binlogClient != null) {
- binlogClient.disconnect();
- }
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.unregisterEventListener(eventListener);
}
}
+ if (binlogClient != null) {
+ binlogClient.disconnect();
+ }
if (currentSession != null) {
FlowFile flowFile =
eventWriterConfiguration.getCurrentFlowFile();