This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 02e6daf7c [hotfix] fix unclosed catalog connection (#3673)
02e6daf7c is described below

commit 02e6daf7cdac76d1fdcc9ff7ce146765b55abb67
Author: Kerwin <[email protected]>
AuthorDate: Fri Jul 5 13:38:24 2024 +0800

    [hotfix] fix unclosed catalog connection (#3673)
---
 .../sink/cdc/CdcDynamicTableParsingProcessFunction.java |  8 ++++++++
 .../sink/cdc/CdcMultiplexRecordChannelComputer.java     |  8 ++++----
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java      |  4 ++++
 .../sink/cdc/CdcMultiplexRecordChannelComputerTest.java |  8 ++++++++
 .../sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  6 +++++-
 .../apache/paimon/flink/compact/MultiTableScanBase.java | 10 +++++++++-
 .../flink/compact/MultiUnawareBucketTableScan.java      |  5 -----
 .../AppendOnlyMultiTableCompactionWorkerOperator.java   |  5 +++++
 .../flink/sink/MultiTablesStoreCompactOperator.java     |  4 ++++
 .../apache/paimon/flink/sink/StoreMultiCommitter.java   |  3 +++
 .../operator/CombinedAwareBatchSourceFunction.java      |  8 ++++++++
 .../operator/CombinedAwareStreamingSourceFunction.java  |  8 ++++++++
 .../operator/CombinedUnawareBatchSourceFunction.java    |  8 ++++++++
 .../CombinedUnawareStreamingSourceFunction.java         |  8 ++++++++
 .../flink/source/operator/MultiTablesReadOperator.java  |  3 +++
 .../paimon/flink/sink/StoreMultiCommitterTest.java      |  8 ++++++++
 .../source/MultiTablesCompactorSourceBuilderITCase.java | 17 +++++++++--------
 17 files changed, 102 insertions(+), 19 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 87aefeb58..0961ff160 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -124,4 +124,12 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
     private CdcMultiplexRecord wrapRecord(String databaseName, String 
tableName, CdcRecord record) {
         return CdcMultiplexRecord.fromCdcRecord(databaseName, tableName, 
record);
     }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index 928a319c4..fdad3a921 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -43,7 +43,6 @@ public class CdcMultiplexRecordChannelComputer implements 
ChannelComputer<CdcMul
     private transient int numChannels;
 
     private Map<Identifier, CdcRecordChannelComputer> channelComputers;
-    private Catalog catalog;
 
     public CdcMultiplexRecordChannelComputer(Catalog.Loader catalogLoader) {
         this.catalogLoader = catalogLoader;
@@ -52,7 +51,6 @@ public class CdcMultiplexRecordChannelComputer implements 
ChannelComputer<CdcMul
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.catalog = catalogLoader.load();
         this.channelComputers = new HashMap<>();
     }
 
@@ -72,11 +70,13 @@ public class CdcMultiplexRecordChannelComputer implements 
ChannelComputer<CdcMul
                 Identifier.create(record.databaseName(), record.tableName()),
                 id -> {
                     FileStoreTable table;
-                    try {
+                    try (Catalog catalog = catalogLoader.load()) {
                         table = (FileStoreTable) catalog.getTable(id);
                     } catch (Catalog.TableNotExistException e) {
-                        LOG.error("Failed to get table " + id.getFullName());
+                        LOG.error("Failed to get table {}", id.getFullName(), 
e);
                         return null;
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
                     }
 
                     if (table.bucketMode() != BucketMode.HASH_FIXED) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 08bedbdb9..a604b3045 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -216,6 +216,10 @@ public class CdcRecordStoreMultiWriteOperator
         if (compactExecutor != null) {
             compactExecutor.shutdownNow();
         }
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 658f1abf1..ce0d484f4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -114,6 +115,13 @@ public class CdcMultiplexRecordChannelComputerTest {
         }
     }
 
+    @AfterEach
+    public void after() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+        }
+    }
+
     @Test
     public void testSchemaWithPartition() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 08d5fa72c..2a1bb4004 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -145,11 +145,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
     }
 
     @AfterEach
-    public void after() {
+    public void after() throws Exception {
         // assert all connections are closed
         Predicate<Path> pathPredicate = path -> 
path.toString().contains(tempDir.toString());
         assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
         assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+
+        if (catalog != null) {
+            catalog.close();
+        }
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
index 05ff4ea9f..52995efac 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java
@@ -48,7 +48,7 @@ import static 
org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompa
  *           ,such as unaware bucket table.
  *     </ol>
  */
-public abstract class MultiTableScanBase<T> {
+public abstract class MultiTableScanBase<T> implements AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MultiTableScanBase.class);
     protected final Pattern includingPattern;
@@ -130,6 +130,14 @@ public abstract class MultiTableScanBase<T> {
     /** Add the scan table to the table map. */
     abstract void addScanTable(FileStoreTable fileStoreTable, Identifier 
identifier);
 
+    @Override
+    public void close() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+    }
+
     /** the result of table scanning. */
     public enum ScanResult {
         FINISHED,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
index a56109d3e..ca1b9b5cb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java
@@ -25,9 +25,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -42,8 +39,6 @@ import java.util.regex.Pattern;
 public class MultiUnawareBucketTableScan
         extends MultiTableScanBase<MultiTableAppendOnlyCompactionTask> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MultiUnawareBucketTableScan.class);
-
     protected transient Map<Identifier, AppendOnlyTableCompactionCoordinator> 
tablesMap;
 
     public MultiUnawareBucketTableScan(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
index 762e9a6df..57b2337fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java
@@ -164,6 +164,11 @@ public class AppendOnlyMultiTableCompactionWorkerOperator
             }
         }
 
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
+
         ExceptionUtils.throwMultiException(exceptions);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 52e494b5a..b25ec0dfc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -203,6 +203,10 @@ public class MultiTablesStoreCompactOperator
         for (StoreSinkWrite write : writes.values()) {
             write.close();
         }
+        if (catalog != null) {
+            catalog.close();
+            catalog = null;
+        }
     }
 
     private FileStoreTable getTable(Identifier tableId) throws 
InterruptedException {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index efa2aefe3..6f0eec319 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -208,5 +208,8 @@ public class StoreMultiCommitter
         for (StoreCommitter committer : tableCommitters.values()) {
             committer.close();
         }
+        if (catalog != null) {
+            catalog.close();
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index 2d6423fce..22ef33029 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -114,4 +114,12 @@ public class CombinedAwareBatchSourceFunction
                         split -> ((DataSplit) split.f0).bucket())
                 .transform(name, typeInfo, new 
MultiTablesReadOperator(catalogLoader, false));
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (tableScan != null) {
+            tableScan.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
index 4df38ef99..bff690ea3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
@@ -121,4 +121,12 @@ public class CombinedAwareStreamingSourceFunction
                         split -> ((DataSplit) split.f0).bucket())
                 .transform(name, typeInfo, new 
MultiTablesReadOperator(catalogLoader, true));
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (tableScan != null) {
+            tableScan.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 748ccebbe..93063bc15 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -121,4 +121,12 @@ public class CombinedUnawareBatchSourceFunction
 
         return new DataStream<>(env, transformation);
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (tableScan != null) {
+            tableScan.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
index 3323af601..790c68c3f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
@@ -113,4 +113,12 @@ public class CombinedUnawareStreamingSourceFunction
                 .forceNonParallel()
                 .rebalance();
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (tableScan != null) {
+            tableScan.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index a5a17cd44..d43245212 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -129,5 +129,8 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
         if (ioManager != null) {
             ioManager.close();
         }
+        if (catalog != null) {
+            catalog.close();
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 693e66e93..832da65f7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -58,6 +58,7 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -145,6 +146,13 @@ class StoreMultiCommitterTest {
         secondTablePath = ((FileStoreTable) 
catalog.getTable(secondTable)).location();
     }
 
+    @AfterEach
+    public void after() throws Exception {
+        if (catalog != null) {
+            catalog.close();
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Recoverable operator tests
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index b53710dc6..d1acde411 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -499,14 +499,15 @@ public class MultiTablesCompactorSourceBuilderITCase 
extends AbstractTestBase
             List<String> primaryKeys,
             Map<String, String> options)
             throws Exception {
-        Catalog catalog = catalogLoader().load();
-        Identifier identifier = Identifier.create(databaseName, tableName);
-        catalog.createDatabase(databaseName, true);
-        catalog.createTable(
-                identifier,
-                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options, ""),
-                false);
-        return (FileStoreTable) catalog.getTable(identifier);
+        try (Catalog catalog = catalogLoader().load()) {
+            Identifier identifier = Identifier.create(databaseName, tableName);
+            catalog.createDatabase(databaseName, true);
+            catalog.createTable(
+                    identifier,
+                    new Schema(rowType.getFields(), partitionKeys, 
primaryKeys, options, ""),
+                    false);
+            return (FileStoreTable) catalog.getTable(identifier);
+        }
     }
 
     private GenericRow rowData(Object... values) {

Reply via email to