This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 0c49959f6 [FLINK-35524][cdc-base] Clear connections pools when reader
exist. (#3388)
0c49959f6 is described below
commit 0c49959f678d6ea4269f78b3080f35cbf1f58601
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Aug 6 15:24:39 2024 +0800
[FLINK-35524][cdc-base] Clear connections pools when reader exist. (#3388)
---
.../cdc/connectors/base/dialect/DataSourceDialect.java | 7 ++++++-
.../cdc/connectors/base/dialect/JdbcDataSourceDialect.java | 6 ++++++
.../base/relational/connection/JdbcConnectionPools.java | 9 +++++++++
.../base/source/assigner/HybridSplitAssigner.java | 3 ++-
.../base/source/assigner/SnapshotSplitAssigner.java | 5 ++++-
.../cdc/connectors/base/source/assigner/SplitAssigner.java | 6 ++++--
.../base/source/assigner/StreamSplitAssigner.java | 5 ++++-
.../base/source/enumerator/IncrementalSourceEnumerator.java | 3 ++-
.../mysql/source/assigners/MySqlBinlogSplitAssigner.java | 7 ++++++-
.../mysql/source/assigners/MySqlSnapshotSplitAssigner.java | 3 +++
.../mysql/source/assigners/MySqlSplitAssigner.java | 6 ++++--
.../mysql/source/connection/JdbcConnectionPools.java | 8 ++++++++
.../mysql/source/enumerator/MySqlSourceEnumerator.java | 3 ++-
.../source/assigners/MySqlBinlogSplitAssignerTest.java | 13 +++++++------
14 files changed, 67 insertions(+), 17 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
index 8d5e810bf..ce5b34335 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
@@ -27,6 +27,8 @@ import
org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
+import java.io.Closeable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -37,7 +39,7 @@ import java.util.Map;
* @param <C> The source config of data source.
*/
@Experimental
-public interface DataSourceDialect<C extends SourceConfig> extends
Serializable {
+public interface DataSourceDialect<C extends SourceConfig> extends
Serializable, Closeable {
/** Get the name of dialect. */
String getName();
@@ -78,4 +80,7 @@ public interface DataSourceDialect<C extends SourceConfig>
extends Serializable
/** Check if the tableId is included in SourceConfig. */
boolean isIncludeDataCollection(C sourceConfig, TableId tableId);
+
+ @Override
+ default void close() throws IOException {}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
index 557c187d2..136e73dda 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import
org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
+import
org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
@@ -28,6 +29,7 @@ import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -59,4 +61,8 @@ public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfi
@Override
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase
sourceSplitBase);
+
+ default void close() throws IOException {
+ JdbcConnectionPools.getInstance(getPooledDataSourceFactory()).clear();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
index 4c4df6f1b..625cb8db9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
@@ -25,6 +25,7 @@ import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -83,4 +84,12 @@ public class JdbcConnectionPools implements
ConnectionPools<HikariDataSource, Jd
}
return jdbcConnectionPoolFactory.getJdbcUrl(sourceConfig);
}
+
+ public void clear() throws IOException {
+ synchronized (pools) {
+ pools.values().stream().forEach(HikariDataSource::close);
+ pools.clear();
+ POOL_FACTORY_MAP.clear();
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
index ae715b279..6764daabd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
@@ -32,6 +32,7 @@ import io.debezium.relational.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -218,7 +219,7 @@ public class HybridSplitAssigner<C extends SourceConfig>
implements SplitAssigne
}
@Override
- public void close() {
+ public void close() throws IOException {
snapshotSplitAssigner.close();
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index d424e89b7..cd0e77200 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -398,7 +399,9 @@ public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssig
}
@Override
- public void close() {}
+ public void close() throws IOException {
+ dialect.close();
+ }
@Override
public boolean noMoreSplits() {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
index 4dd9e4904..0740ff402 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
@@ -24,6 +24,8 @@ import
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -34,7 +36,7 @@ import java.util.Optional;
* determines split processing order.
*/
@Experimental
-public interface SplitAssigner {
+public interface SplitAssigner extends Closeable {
/**
* Called to open the assigner to acquire any resources, like threads or
network connections.
@@ -120,5 +122,5 @@ public interface SplitAssigner {
* Called to close the assigner, in case it holds on to any resources,
like threads or network
* connections.
*/
- void close();
+ void close() throws IOException;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
index 74bd04fca..1e7b2fa1e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
@@ -28,6 +28,7 @@ import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -133,7 +134,9 @@ public class StreamSplitAssigner implements SplitAssigner {
}
@Override
- public void close() {}
+ public void close() throws IOException {
+ dialect.close();
+ }
//
------------------------------------------------------------------------------------------
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
index 05dcc6bd2..057833484 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -187,7 +188,7 @@ public class IncrementalSourceEnumerator
}
@Override
- public void close() {
+ public void close() throws IOException {
LOG.info("Closing enumerator...");
splitAssigner.close();
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
index dd80e23b8..aa35e47a6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
@@ -20,12 +20,14 @@ package
org.apache.flink.cdc.connectors.mysql.source.assigners;
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import
org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.util.CollectionUtil;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -121,7 +123,10 @@ public class MySqlBinlogSplitAssigner implements
MySqlSplitAssigner {
public void onBinlogSplitUpdated() {}
@Override
- public void close() {}
+ public void close() throws IOException {
+ // clear jdbc connection pools
+ JdbcConnectionPools.getInstance().clear();
+ }
//
------------------------------------------------------------------------------------------
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index e2dd426a6..e209921b5 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -23,6 +23,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitte
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import
org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
@@ -489,6 +490,8 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
if (chunkSplitter != null) {
try {
chunkSplitter.close();
+ // clear jdbc connection pools
+ JdbcConnectionPools.getInstance().clear();
} catch (Exception e) {
LOG.warn("Fail to close the chunk splitter.");
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
index 2673d9d7e..ac5f1c0c0 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
@@ -24,6 +24,8 @@ import
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -34,7 +36,7 @@ import java.util.Optional;
* determines split processing order.
*/
@Internal
-public interface MySqlSplitAssigner {
+public interface MySqlSplitAssigner extends Closeable {
/**
* Called to open the assigner to acquire any resources, like threads or
network connections.
@@ -120,5 +122,5 @@ public interface MySqlSplitAssigner {
* Called to close the assigner, in case it holds on to any resources,
like threads or network
* connections.
*/
- void close();
+ void close() throws IOException;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
index 4396156fe..9505a559a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
@@ -23,6 +23,7 @@ import com.zaxxer.hikari.HikariDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -51,4 +52,11 @@ public class JdbcConnectionPools implements ConnectionPools {
return pools.get(poolId);
}
}
+
+ public void clear() throws IOException {
+ synchronized (pools) {
+ pools.values().stream().forEach(HikariDataSource::close);
+ pools.clear();
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index 50d7607b6..3ab2ab509 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -190,7 +191,7 @@ public class MySqlSourceEnumerator implements
SplitEnumerator<MySqlSplit, Pendin
}
@Override
- public void close() {
+ public void close() throws IOException {
LOG.info("Closing enumerator...");
splitAssigner.close();
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
index 536dfbee5..9822f1afd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.junit.Test;
+import java.io.IOException;
import java.time.ZoneId;
import java.util.Optional;
@@ -40,36 +41,36 @@ import static org.junit.Assert.assertTrue;
public class MySqlBinlogSplitAssignerTest {
@Test
- public void testStartFromEarliest() {
+ public void testStartFromEarliest() throws IOException {
checkAssignedBinlogOffset(StartupOptions.earliest(),
BinlogOffset.ofEarliest());
}
@Test
- public void testStartFromLatestOffset() {
+ public void testStartFromLatestOffset() throws IOException {
checkAssignedBinlogOffset(StartupOptions.latest(),
BinlogOffset.ofLatest());
}
@Test
- public void testStartFromTimestamp() {
+ public void testStartFromTimestamp() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.timestamp(15213000L),
BinlogOffset.ofTimestampSec(15213L));
}
@Test
- public void testStartFromBinlogFile() {
+ public void testStartFromBinlogFile() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.specificOffset("foo-file", 15213),
BinlogOffset.ofBinlogFilePosition("foo-file", 15213L));
}
@Test
- public void testStartFromGtidSet() {
+ public void testStartFromGtidSet() throws IOException {
checkAssignedBinlogOffset(
StartupOptions.specificOffset("foo-gtid"),
BinlogOffset.ofGtidSet("foo-gtid"));
}
private void checkAssignedBinlogOffset(
- StartupOptions startupOptions, BinlogOffset expectedOffset) {
+ StartupOptions startupOptions, BinlogOffset expectedOffset) throws
IOException {
// Set starting from the given option
MySqlBinlogSplitAssigner assigner = new
MySqlBinlogSplitAssigner(getConfig(startupOptions));
// Get splits from assigner