This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 02e6daf7c [hotfix] fix unclosed catalog connection (#3673)
02e6daf7c is described below
commit 02e6daf7cdac76d1fdcc9ff7ce146765b55abb67
Author: Kerwin <[email protected]>
AuthorDate: Fri Jul 5 13:38:24 2024 +0800
[hotfix] fix unclosed catalog connection (#3673)
---
.../sink/cdc/CdcDynamicTableParsingProcessFunction.java | 8 ++++++++
.../sink/cdc/CdcMultiplexRecordChannelComputer.java | 8 ++++----
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 4 ++++
.../sink/cdc/CdcMultiplexRecordChannelComputerTest.java | 8 ++++++++
.../sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java | 6 +++++-
.../apache/paimon/flink/compact/MultiTableScanBase.java | 10 +++++++++-
.../flink/compact/MultiUnawareBucketTableScan.java | 5 -----
.../AppendOnlyMultiTableCompactionWorkerOperator.java | 5 +++++
.../flink/sink/MultiTablesStoreCompactOperator.java | 4 ++++
.../apache/paimon/flink/sink/StoreMultiCommitter.java | 3 +++
.../operator/CombinedAwareBatchSourceFunction.java | 8 ++++++++
.../operator/CombinedAwareStreamingSourceFunction.java | 8 ++++++++
.../operator/CombinedUnawareBatchSourceFunction.java | 8 ++++++++
.../CombinedUnawareStreamingSourceFunction.java | 8 ++++++++
.../flink/source/operator/MultiTablesReadOperator.java | 3 +++
.../paimon/flink/sink/StoreMultiCommitterTest.java | 8 ++++++++
.../source/MultiTablesCompactorSourceBuilderITCase.java | 17 +++++++++--------
17 files changed, 102 insertions(+), 19 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 87aefeb58..0961ff160 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -124,4 +124,12 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
private CdcMultiplexRecord wrapRecord(String databaseName, String
tableName, CdcRecord record) {
return CdcMultiplexRecord.fromCdcRecord(databaseName, tableName,
record);
}
+
+ @Override
+ public void close() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index 928a319c4..fdad3a921 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -43,7 +43,6 @@ public class CdcMultiplexRecordChannelComputer implements
ChannelComputer<CdcMul
private transient int numChannels;
private Map<Identifier, CdcRecordChannelComputer> channelComputers;
- private Catalog catalog;
public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
this.catalogLoader = catalogLoader;
@@ -52,7 +51,6 @@ public class CdcMultiplexRecordChannelComputer implements
ChannelComputer<CdcMul
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.catalog = catalogLoader.load();
this.channelComputers = new HashMap<>();
}
@@ -72,11 +70,13 @@ public class CdcMultiplexRecordChannelComputer implements
ChannelComputer<CdcMul
Identifier.create(record.databaseName(), record.tableName()),
id -> {
FileStoreTable table;
- try {
+ try (Catalog catalog = catalogLoader.load()) {
table = (FileStoreTable) catalog.getTable(id);
} catch (Catalog.TableNotExistException e) {
- LOG.error("Failed to get table " + id.getFullName());
+ LOG.error("Failed to get table {}", id.getFullName(),
e);
return null;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
if (table.bucketMode() != BucketMode.HASH_FIXED) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 08bedbdb9..a604b3045 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -216,6 +216,10 @@ public class CdcRecordStoreMultiWriteOperator
if (compactExecutor != null) {
compactExecutor.shutdownNow();
}
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 658f1abf1..ce0d484f4 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -114,6 +115,13 @@ public class CdcMultiplexRecordChannelComputerTest {
}
}
+ @AfterEach
+ public void after() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ }
+ }
+
@Test
public void testSchemaWithPartition() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 08d5fa72c..2a1bb4004 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -145,11 +145,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
}
@AfterEach
- public void after() {
+ public void after() throws Exception {
// assert all connections are closed
Predicate<Path> pathPredicate = path ->
path.toString().contains(tempDir.toString());
assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+
+ if (catalog != null) {
+ catalog.close();
+ }
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
index 05ff4ea9f..52995efac 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
@@ -48,7 +48,7 @@ import static
org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompa
* ,such as unaware bucket table.
* </ol>
*/
-public abstract class MultiTableScanBase<T> {
+public abstract class MultiTableScanBase<T> implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(MultiTableScanBase.class);
protected final Pattern includingPattern;
@@ -130,6 +130,14 @@ public abstract class MultiTableScanBase<T> {
/** Add the scan table to the table map. */
abstract void addScanTable(FileStoreTable fileStoreTable, Identifier
identifier);
+ @Override
+ public void close() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
+ }
+
/** the result of table scanning. */
public enum ScanResult {
FINISHED,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
index a56109d3e..ca1b9b5cb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
@@ -25,9 +25,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -42,8 +39,6 @@ import java.util.regex.Pattern;
public class MultiUnawareBucketTableScan
extends MultiTableScanBase<MultiTableAppendOnlyCompactionTask> {
- private static final Logger LOG =
LoggerFactory.getLogger(MultiUnawareBucketTableScan.class);
-
protected transient Map<Identifier, AppendOnlyTableCompactionCoordinator>
tablesMap;
public MultiUnawareBucketTableScan(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 762e9a6df..57b2337fa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -164,6 +164,11 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
}
}
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
+
ExceptionUtils.throwMultiException(exceptions);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 52e494b5a..b25ec0dfc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -203,6 +203,10 @@ public class MultiTablesStoreCompactOperator
for (StoreSinkWrite write : writes.values()) {
write.close();
}
+ if (catalog != null) {
+ catalog.close();
+ catalog = null;
+ }
}
private FileStoreTable getTable(Identifier tableId) throws
InterruptedException {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index efa2aefe3..6f0eec319 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -208,5 +208,8 @@ public class StoreMultiCommitter
for (StoreCommitter committer : tableCommitters.values()) {
committer.close();
}
+ if (catalog != null) {
+ catalog.close();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index 2d6423fce..22ef33029 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -114,4 +114,12 @@ public class CombinedAwareBatchSourceFunction
split -> ((DataSplit) split.f0).bucket())
.transform(name, typeInfo, new
MultiTablesReadOperator(catalogLoader, false));
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
index 4df38ef99..bff690ea3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
@@ -121,4 +121,12 @@ public class CombinedAwareStreamingSourceFunction
split -> ((DataSplit) split.f0).bucket())
.transform(name, typeInfo, new
MultiTablesReadOperator(catalogLoader, true));
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 748ccebbe..93063bc15 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -121,4 +121,12 @@ public class CombinedUnawareBatchSourceFunction
return new DataStream<>(env, transformation);
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
index 3323af601..790c68c3f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
@@ -113,4 +113,12 @@ public class CombinedUnawareStreamingSourceFunction
.forceNonParallel()
.rebalance();
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index a5a17cd44..d43245212 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -129,5 +129,8 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
if (ioManager != null) {
ioManager.close();
}
+ if (catalog != null) {
+ catalog.close();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 693e66e93..832da65f7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -58,6 +58,7 @@ import
org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -145,6 +146,13 @@ class StoreMultiCommitterTest {
secondTablePath = ((FileStoreTable)
catalog.getTable(secondTable)).location();
}
+ @AfterEach
+ public void after() throws Exception {
+ if (catalog != null) {
+ catalog.close();
+ }
+ }
+
// ------------------------------------------------------------------------
// Recoverable operator tests
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index b53710dc6..d1acde411 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -499,14 +499,15 @@ public class MultiTablesCompactorSourceBuilderITCase
extends AbstractTestBase
List<String> primaryKeys,
Map<String, String> options)
throws Exception {
- Catalog catalog = catalogLoader().load();
- Identifier identifier = Identifier.create(databaseName, tableName);
- catalog.createDatabase(databaseName, true);
- catalog.createTable(
- identifier,
- new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options, ""),
- false);
- return (FileStoreTable) catalog.getTable(identifier);
+ try (Catalog catalog = catalogLoader().load()) {
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ catalog.createDatabase(databaseName, true);
+ catalog.createTable(
+ identifier,
+ new Schema(rowType.getFields(), partitionKeys,
primaryKeys, options, ""),
+ false);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
}
private GenericRow rowData(Object... values) {