This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1f2f3fc5f0 [Test][E2E] Add thread leak check for connector (#5773)
1f2f3fc5f0 is described below
commit 1f2f3fc5f0925f755a4de2a3f1b8e590bc1a88be
Author: Jia Fan <[email protected]>
AuthorDate: Thu Feb 1 10:51:41 2024 +0800
[Test][E2E] Add thread leak check for connector (#5773)
---
.../file/hadoop/HadoopFileSystemProxy.java | 6 +-
.../seatunnel/file/sink/BaseFileSinkWriter.java | 6 +-
.../file/sink/BaseMultipleTableFileSink.java | 20 +--
.../file/sink/writer/AbstractWriteStrategy.java | 10 ++
.../seatunnel/file/sink/writer/WriteStrategy.java | 3 +-
.../file/source/BaseFileSourceReader.java | 4 +-
.../file/source/reader/AbstractReadStrategy.java | 10 ++
.../reader/MultipleTableFileSourceReader.java | 3 +
.../seatunnel/file/source/reader/ReadStrategy.java | 3 +-
.../seatunnel/file/s3/catalog/S3FileCatalog.java | 11 +-
.../seatunnel/file/sftp/system/SFTPFileSystem.java | 6 +
.../source/enumerator/AbstractSplitEnumerator.java | 3 +
.../iceberg/source/reader/IcebergSourceReader.java | 3 +
.../influxdb/sink/InfluxDBSinkWriter.java | 3 +
.../source/InfluxDBSourceSplitEnumerator.java | 4 +-
.../influxdb/source/InfluxdbSourceReader.java | 3 +
.../rocketmq/sink/RocketMqSinkWriter.java | 3 +
.../rocketmq/source/RocketMqSourceReader.java | 3 +
.../source/RocketMqSourceSplitEnumerator.java | 3 +
.../container/seatunnel/SeaTunnelContainer.java | 166 ++++++++++++++++++++-
.../seatunnel/e2e/common/util/ContainerUtil.java | 65 ++++++++
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 37 ++---
.../seatunnel/engine/e2e/JobClientJobProxyIT.java | 2 +
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 19 +++
.../src/test/resources/jvm_options | 3 +
.../src/test/resources/log4j2.properties | 4 +-
.../engine/common/config/SeaTunnelConfig.java | 1 +
.../engine/common/loader/ClassLoaderUtil.java | 35 +++++
.../core/parse/MultipleTableJobConfigParser.java | 56 +++----
.../seatunnel/engine/server/SeaTunnelServer.java | 6 +
.../engine/server/TaskExecutionService.java | 13 +-
.../server/checkpoint/CheckpointCoordinator.java | 2 -
.../seatunnel/engine/server/master/JobMaster.java | 12 +-
.../seatunnel/engine/server/rest/RestConstant.java | 3 +
.../server/rest/RestHttpGetCommandProcessor.java | 21 +++
35 files changed, 472 insertions(+), 80 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index a43baa7c31..61f4520e4b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -230,10 +230,14 @@ public class HadoopFileSystemProxy implements
Serializable, Closeable {
@Override
public void close() throws IOException {
- try (FileSystem closedFileSystem = fileSystem) {
+ try {
if (userGroupInformation != null && enableKerberos()) {
userGroupInformation.logoutUserFromKeytab();
}
+ } finally {
+ if (fileSystem != null) {
+ fileSystem.close();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 7d14de2fff..2527d99d56 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -153,5 +153,9 @@ public class BaseFileSinkWriter
}
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ if (writeStrategy != null) {
+ writeStrategy.close();
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index fee66ad467..6beb62d7e8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
@@ -46,9 +45,8 @@ public abstract class BaseMultipleTableFileSink
SupportMultiTableSink {
private final HadoopConf hadoopConf;
- private final HadoopFileSystemProxy hadoopFileSystemProxy;
+ private final CatalogTable catalogTable;
private final FileSinkConfig fileSinkConfig;
- private final WriteStrategy writeStrategy;
private String jobId;
public abstract String getPluginName();
@@ -58,10 +56,7 @@ public abstract class BaseMultipleTableFileSink
this.hadoopConf = hadoopConf;
this.fileSinkConfig =
new FileSinkConfig(readonlyConfig.toConfig(),
catalogTable.getSeaTunnelRowType());
- this.writeStrategy =
- WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
- this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
-
this.writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ this.catalogTable = catalogTable;
}
@Override
@@ -72,7 +67,7 @@ public abstract class BaseMultipleTableFileSink
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
- return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
+ return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf,
context, jobId, states);
}
@Override
@@ -84,7 +79,7 @@ public abstract class BaseMultipleTableFileSink
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
createWriter(
SinkWriter.Context context) {
- return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId);
+ return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf,
context, jobId);
}
@Override
@@ -101,4 +96,11 @@ public abstract class BaseMultipleTableFileSink
public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}
+
+ protected WriteStrategy createWriteStrategy() {
+ WriteStrategy writeStrategy =
+ WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
+
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ return writeStrategy;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 7f30e723ae..ab88b2256d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -414,4 +414,14 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
public HadoopFileSystemProxy getHadoopFileSystemProxy() {
return hadoopFileSystemProxy;
}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (hadoopFileSystemProxy != null) {
+ hadoopFileSystemProxy.close();
+ }
+ } catch (Exception ignore) {
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index a2fb5c1510..6a1b1840b4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -26,11 +26,12 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig
import org.apache.hadoop.conf.Configuration;
+import java.io.Closeable;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
-public interface WriteStrategy extends Transaction, Serializable {
+public interface WriteStrategy extends Transaction, Serializable, Closeable {
/**
* init hadoop conf
*
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index 37b525df67..1119ae9bf3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -48,7 +48,9 @@ public class BaseFileSourceReader implements
SourceReader<SeaTunnelRow, FileSour
public void open() throws Exception {}
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ readStrategy.close();
+ }
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 071414c9da..33b688af06 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -201,4 +201,14 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
return true;
}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (hadoopFileSystemProxy != null) {
+ hadoopFileSystemProxy.close();
+ }
+ } catch (Exception ignore) {
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
index 66775dd6d7..661a466e49 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/MultipleTableFileSourceReader.java
@@ -123,5 +123,8 @@ public class MultipleTableFileSourceReader implements
SourceReader<SeaTunnelRow,
public void close() throws IOException {
// do nothing
log.info("Closed the MultipleTableLocalFileSourceReader");
+ for (ReadStrategy strategy : readStrategyMap.values()) {
+ strategy.close();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 462d58b076..a269594e1f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -25,11 +25,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-public interface ReadStrategy extends Serializable {
+public interface ReadStrategy extends Serializable, Closeable {
void init(HadoopConf conf);
void read(String path, String tableId, Collector<SeaTunnelRow> output)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
index 4a527e2a60..0f48a2c4ae 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
+import java.io.IOException;
import java.util.List;
@AllArgsConstructor
@@ -47,7 +48,15 @@ public class S3FileCatalog implements Catalog {
public void open() throws CatalogException {}
@Override
- public void close() throws CatalogException {}
+ public void close() throws CatalogException {
+ if (hadoopFileSystemProxy != null) {
+ try {
+ hadoopFileSystemProxy.close();
+ } catch (IOException e) {
+ throw new CatalogException(e);
+ }
+ }
+ }
@Override
public String name() {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
index 555fb5bf9d..f49145bc4c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
@@ -617,4 +617,10 @@ public class SFTPFileSystem extends FileSystem {
disconnect(channel);
}
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ connectionPool.shutdown();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index 26a971cc2e..03785b40da 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.apache.iceberg.Table;
+import org.apache.iceberg.util.ThreadPools;
import lombok.Getter;
import lombok.NonNull;
@@ -74,6 +75,8 @@ public abstract class AbstractSplitEnumerator
public void close() throws IOException {
icebergTableLoader.close();
isOpen = false;
+ // TODO we should remove shutdown logic when supported closed part task
+ ThreadPools.getWorkerPool().shutdownNow();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
index 2242704860..a12fc79b51 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFil
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.util.ThreadPools;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@@ -95,6 +96,8 @@ public class IcebergSourceReader implements
SourceReader<SeaTunnelRow, IcebergFi
icebergFileScanTaskSplitReader.close();
}
icebergTableLoader.close();
+ // TODO we should remove shutdown logic when supported closed part task
+ ThreadPools.getWorkerPool().shutdownNow();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
index f2d401db51..c97ab6c2c6 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -36,6 +36,7 @@ import org.influxdb.dto.Point;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import okhttp3.internal.concurrent.TaskRunner;
import java.io.IOException;
import java.net.ConnectException;
@@ -88,6 +89,8 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
if (influxdb != null) {
influxdb.close();
influxdb = null;
+ // TODO we should remove shutdown logic when supported closed part
task
+ ((TaskRunner.RealBackend)
TaskRunner.INSTANCE.getBackend()).shutdown();
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index be69fac9c0..d810a2192f 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceSt
import org.apache.commons.lang3.tuple.Pair;
import lombok.extern.slf4j.Slf4j;
+import okhttp3.internal.concurrent.TaskRunner;
import java.util.ArrayList;
import java.util.Collection;
@@ -220,7 +221,8 @@ public class InfluxDBSourceSplitEnumerator
@Override
public void close() {
- // nothing to do
+ // TODO we should remove shutdown logic when supported closed part task
+ ((TaskRunner.RealBackend) TaskRunner.INSTANCE.getBackend()).shutdown();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
index 16eec22839..184ec3ecaf 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
@@ -35,6 +35,7 @@ import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import lombok.extern.slf4j.Slf4j;
+import okhttp3.internal.concurrent.TaskRunner;
import java.net.ConnectException;
import java.util.ArrayList;
@@ -93,6 +94,8 @@ public class InfluxdbSourceReader implements
SourceReader<SeaTunnelRow, InfluxDB
if (influxdb != null) {
influxdb.close();
influxdb = null;
+ // TODO we should remove shutdown logic when supported closed part
task
+ ((TaskRunner.RealBackend)
TaskRunner.INSTANCE.getBackend()).shutdown();
}
}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
index a02887fa98..b8cbaf70da 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
@@ -49,6 +49,9 @@ public class RocketMqSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
new RocketMqNoTransactionSender(
producerMetadata.getConfiguration(),
producerMetadata.isSync());
}
+ // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
+ // `AsyncAppender-Dispatcher-Thread`
+ System.setProperty("rocketmq.client.logUseSlf4j", "true");
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 42c3788e81..fd4f986072 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -75,6 +75,9 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
this.executorService =
Executors.newCachedThreadPool(r -> new Thread(r, "RocketMq
Source Data Consumer"));
pendingPartitionsQueue = new LinkedBlockingQueue<>();
+ // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
+ // `AsyncAppender-Dispatcher-Thread`
+ System.setProperty("rocketmq.client.logUseSlf4j", "true");
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 6630d495f9..45ded447f2 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -67,6 +67,9 @@ public class RocketMqSourceSplitEnumerator
this.context = context;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
+ // Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
+ // `AsyncAppender-Dispatcher-Thread`
+ System.setProperty("rocketmq.client.logUseSlf4j", "true");
}
public RocketMqSourceSplitEnumerator(
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index dd9b14f1ea..8a98a50e20 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -23,6 +23,14 @@ import
org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -30,15 +38,22 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
+import groovy.lang.Tuple2;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.file.Paths;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
@@ -46,10 +61,12 @@ import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PA
@Slf4j
@AutoService(TestContainer.class)
public class SeaTunnelContainer extends AbstractTestContainer {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
private static final String CLIENT_SHELL = "seatunnel.sh";
private static final String SERVER_SHELL = "seatunnel-cluster.sh";
protected GenericContainer<?> server;
+ private final AtomicInteger runningCount = new AtomicInteger();
@Override
public void startUp() throws Exception {
@@ -68,7 +85,7 @@ public class SeaTunnelContainer extends AbstractTestContainer
{
"seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
- server.setExposedPorts(Arrays.asList(5801));
+ server.setPortBindings(Collections.singletonList("5801:5801"));
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
@@ -152,7 +169,145 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
public Container.ExecResult executeJob(String confFile)
throws IOException, InterruptedException {
log.info("test in container: {}", identifier());
- return executeJob(server, confFile);
+ List<String> beforeThreads = ContainerUtil.getJVMThreadNames(server);
+ runningCount.incrementAndGet();
+ Container.ExecResult result = executeJob(server, confFile);
+ if (runningCount.decrementAndGet() > 0) {
+ // only check thread when job all finished.
+ return result;
+ }
+ List<String> afterThreads = ContainerUtil.getJVMThreadNames(server);
+ afterThreads = removeSystemThread(beforeThreads, afterThreads);
+ if (afterThreads.isEmpty()) {
+ // classLoaderObjectCheck(1);
+ return result;
+ } else {
+ // Waiting 10s for release thread
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ List<String> threads =
ContainerUtil.getJVMThreadNames(server);
+ threads = removeSystemThread(beforeThreads,
threads);
+ List<String> finalAfterThreads = threads;
+ Assertions.assertTrue(
+ threads.isEmpty(),
+ "There are still threads running in
the container: \n"
+ +
ContainerUtil.getJVMThreads(server).stream()
+ .filter(
+ tuple2 ->
+
finalAfterThreads.contains(
+
tuple2.getV1()))
+ .map(Tuple2::getV2)
+ .map(str -> str + "\n")
+
.collect(Collectors.joining()));
+ });
+ }
+ // classLoaderObjectCheck(1);
+ return result;
+ }
+
+ private List<String> removeSystemThread(List<String> beforeThreads,
List<String> afterThreads)
+ throws IOException {
+ Pattern aqsThread = Pattern.compile("pool-[0-9]-thread-[0-9]");
+ afterThreads.removeIf(
+ s ->
+ s.startsWith("hz.main")
+ ||
s.startsWith("seatunnel-coordinator-service")
+ || s.startsWith("GC task thread")
+ || s.contains("CompilerThread")
+ ||
s.contains("NioNetworking-closeListenerExecutor")
+ || s.contains("ForkJoinPool.commonPool")
+ || s.contains("DestroyJavaVM")
+ || s.contains("main-query-state-checker")
+ || s.contains("Keep-Alive-SocketCleaner")
+ || s.contains("process reaper")
+ || s.startsWith("Timer-")
+ || s.contains("InterruptTimer")
+ || s.contains("Java2D Disposer")
+ || s.contains(
+
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
+ || s.startsWith("Log4j2-TF-")
+ || aqsThread.matcher(s).matches());
+ afterThreads.removeIf(beforeThreads::contains);
+ Map<String, String> threadAndClassLoader = getThreadClassLoader();
+ List<String> notSystemClassLoaderThread =
+ threadAndClassLoader.entrySet().stream()
+ .filter(
+ tc -> {
+ // system thread, ttl 60s
+ if (tc.getKey().contains("process
reaper")) {
+ return false;
+ }
+ String classLoader = tc.getValue();
+ return
!classLoader.contains("AppClassLoader")
+ && !classLoader.equals("null");
+ })
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ notSystemClassLoaderThread.addAll(afterThreads);
+ notSystemClassLoaderThread.removeIf(this::isIssueWeAlreadyKnow);
+ return notSystemClassLoaderThread;
+ }
+
+ private void classLoaderObjectCheck(Integer maxSize) throws IOException,
InterruptedException {
+ Map<String, Integer> objects = ContainerUtil.getJVMLiveObject(server);
+ String className =
+
"org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader";
+ if (objects.containsKey(className) && objects.get(className) >
maxSize) {
+ Awaitility.await()
+ .atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ Map<String, Integer> newObjects =
+ ContainerUtil.getJVMLiveObject(server);
+ if (newObjects.containsKey(className)) {
+ Assertions.assertTrue(
+ newObjects.get(className) <=
maxSize,
+ "There are still
SeaTunnelChildFirstClassLoader objects in the seatunnel server");
+ }
+ });
+ }
+ }
+
+ private Map<String, String> getThreadClassLoader() throws IOException {
+ HttpGet get = new
HttpGet("http://localhost:5801/hazelcast/rest/maps/running-threads");
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ CloseableHttpResponse response = client.execute(get);
+ String threads = EntityUtils.toString(response.getEntity());
+ List<Map<String, String>> value =
+ OBJECT_MAPPER.readValue(
+ threads, new TypeReference<List<Map<String,
String>>>() {});
+ return value.stream()
+ .collect(
+ Collectors.toMap(
+ map -> map.get("threadName"),
+ map -> map.get("classLoader"),
+ (a, b) -> a + " && " + b));
+ }
+ }
+
+ /** The thread should be recycled but not, we should fix it in the future.
*/
+ private boolean isIssueWeAlreadyKnow(String threadName) {
+ // ClickHouse com.clickhouse.client.ClickHouseClientBuilder
+ return threadName.startsWith("ClickHouseClientWorker")
+ // InfluxDB okio.AsyncTimeout$Watchdog
+ || threadName.startsWith("Okio Watchdog")
+ // InfluxDB okhttp3.internal.concurrent.TaskRunner.RealBackend
+ || threadName.startsWith("OkHttp TaskRunner")
+ // IOTDB org.apache.iotdb.session.Session
+ || threadName.startsWith("SessionExecutor")
+ // Oracle Driver
+ //
oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser
+ || threadName.contains(
+
"oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser")
+ // RocketMQ
+ //
org.apache.rocketmq.logging.inner.LoggingBuilder$AsyncAppender$Dispatcher
+ || threadName.startsWith("AsyncAppender-Dispatcher-Thread")
+ // MongoDB
+ || threadName.startsWith("BufferPoolPruner")
+ || threadName.startsWith("MaintenanceTimer")
+ || threadName.startsWith("cluster-");
}
@Override
@@ -164,7 +319,10 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
@Override
public Container.ExecResult restoreJob(String confFile, String jobId)
throws IOException, InterruptedException {
- return restoreJob(server, confFile, jobId);
+ runningCount.incrementAndGet();
+ Container.ExecResult result = restoreJob(server, confFile, jobId);
+ runningCount.decrementAndGet();
+ return result;
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 722a5c18e3..641c90bf31 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -28,19 +28,24 @@ import
org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.MountableFile;
+import groovy.lang.Tuple2;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
@@ -256,4 +261,64 @@ public final class ContainerUtil {
Path path, String targetPath, GenericContainer<?> container) {
container.copyFileToContainer(MountableFile.forHostPath(path),
targetPath);
}
+
+ public static List<String> getJVMThreadNames(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ return
getJVMThreads(container).stream().map(Tuple2::getV1).collect(Collectors.toList());
+ }
+
+ public static Map<String, Integer> getJVMLiveObject(GenericContainer<?>
container)
+ throws IOException, InterruptedException {
+ Container.ExecResult liveObjects =
+ container.execInContainer("jmap", "-histo:live",
getJVMProcessId(container));
+ Assertions.assertEquals(0, liveObjects.getExitCode());
+ String value = liveObjects.getStdout().trim();
+ return Arrays.stream(value.split("\n"))
+ .skip(2)
+ .map(
+ str ->
+ Arrays.stream(str.split(" "))
+ .filter(StringUtils::isNotEmpty)
+ .collect(Collectors.toList()))
+ .filter(list -> list.size() == 4)
+ .collect(
+ Collectors.toMap(
+ list -> list.get(3),
+ list -> Integer.valueOf(list.get(1)),
+ (a, b) -> a));
+ }
+
+ public static List<Tuple2<String, String>>
getJVMThreads(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult threads =
+ container.execInContainer("jstack",
getJVMProcessId(container));
+ Assertions.assertEquals(0, threads.getExitCode());
+ // Thread name line example
+ // "hz.main.MetricsRegistry.thread-2" #232 prio=5 os_prio=0
tid=0x0000ffff3c003000 nid=0x5e
+ // waiting on condition [0x0000ffff6cf3a000]
+ return Arrays.stream(threads.getStdout().trim().split("\n\n"))
+ .filter(s -> s.startsWith("\""))
+ .map(
+ threadStr ->
+ new Tuple2<>(
+ Arrays.stream(threadStr.split("\n"))
+ .filter(s ->
s.startsWith("\""))
+ .map(s -> s.substring(1,
s.lastIndexOf("\"")))
+ .findFirst()
+ .get(),
+ threadStr))
+ .collect(Collectors.toList());
+ }
+
+ private static String getJVMProcessId(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ Container.ExecResult processes = container.execInContainer("jps");
+ Assertions.assertEquals(0, processes.getExitCode());
+ Optional<String> server =
+ Arrays.stream(processes.getStdout().trim().split("\n"))
+ .filter(s -> s.contains("SeaTunnelServer"))
+ .findFirst();
+ Assertions.assertTrue(server.isPresent());
+ return server.get().trim().split(" ")[0];
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 6e02dd6478..013cfd33de 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -26,6 +26,8 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
import org.testcontainers.containers.Container;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -41,6 +44,7 @@ import java.util.regex.Pattern;
import static org.awaitility.Awaitility.await;
@Slf4j
+@DisabledOnJre(value = JRE.JAVA_11, disabledReason = "slf4j jar conflict, we
should fix it later")
public class CheckpointEnableIT extends TestSuiteBase {
@TestTemplate
@@ -89,18 +93,19 @@ public class CheckpointEnableIT extends TestSuiteBase {
disabledReason =
"depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
public void testZetaStreamingCheckpointInterval(TestContainer container)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, ExecutionException {
// start job
- CompletableFuture.supplyAsync(
- () -> {
- try {
- return container.executeJob(
-
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
- } catch (Exception e) {
- log.error("Commit task exception :" + e.getMessage());
- throw new RuntimeException(e);
- }
- });
+ CompletableFuture<Container.ExecResult> startFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
// wait obtain job id
AtomicReference<String> jobId = new AtomicReference<>();
@@ -121,16 +126,14 @@ public class CheckpointEnableIT extends TestSuiteBase {
Thread.sleep(15000);
Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is enabled"));
Assertions.assertEquals(0,
container.savepointJob(jobId.get()).getExitCode());
-
+ Assertions.assertEquals(0, startFuture.get().getExitCode());
// restore job
CompletableFuture.supplyAsync(
() -> {
try {
- return container
- .restoreJob(
-
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
- jobId.get())
- .getExitCode();
+ return container.restoreJob(
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
+ jobId.get());
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index ce54ba84c2..9f5ddf8c38 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -34,6 +34,7 @@ import org.testcontainers.utility.MountableFile;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collections;
import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
@@ -59,6 +60,7 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
server.setExposedPorts(Arrays.asList(5801));
+ server.setPortBindings(Collections.singletonList("5801:5801"));
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 2d2fc5d96e..c6cb429c0a 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
@Slf4j
public class RestApiIT {
@@ -126,6 +127,24 @@ public class RestApiIT {
});
}
+ @Test
+ public void testGetRunningThreads() {
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance ->
+ given().get(
+ HOST
+ + instance.getCluster()
+
.getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_THREADS)
+ .then()
+ .statusCode(200)
+ .body("[0].threadName", notNullValue())
+ .body("[0].classLoader",
notNullValue()));
+ }
+
@Test
public void testSystemMonitoringInformation() {
Arrays.asList(node2, node1)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
index 75facf75c0..f7d00c6eaf 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/jvm_options
@@ -22,3 +22,6 @@
# JVM Dump
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
+
+# Only used for test!!! We should make sure soft reference be collected ASAP
+-XX:SoftRefLRUPolicyMSPerMB=1
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
index bfcd94a55a..6b6c6335ec 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
@@ -16,10 +16,10 @@
# limitations under the License.
################################################################################
-rootLogger.level = WARN
+rootLogger.level = INFO
logger.zeta.name=org.apache.seatunnel.engine
-logger.zeta.level=WARN
+logger.zeta.level=INFO
# For print job id
logger.zetaMaster.name=org.apache.seatunnel.engine.server.master
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
index dfbfd2c66b..4c4b7e798e 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -49,6 +49,7 @@ public class SeaTunnelConfig {
hazelcastConfig
.getHotRestartPersistenceConfig()
.setBaseDir(new File(seatunnelHome(),
"recovery").getAbsoluteFile());
+ System.setProperty("hazelcast.compat.classloading.cache.disabled",
"true");
}
/**
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java
new file mode 100644
index 0000000000..cb6e0820f0
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.loader;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ClassLoaderUtil {
+
+ public static void recycleClassLoaderFromThread(ClassLoader classLoader) {
+ log.info("recycle classloader " + classLoader);
+ Thread.getAllStackTraces().keySet().stream()
+ .filter(thread -> thread.getContextClassLoader() ==
classLoader)
+ .forEach(
+ thread -> {
+ log.info("recycle classloader for thread " +
thread.getName());
+ thread.setContextClassLoader(null);
+ });
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index cc6cb501cd..137da2e0a1 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -164,37 +164,39 @@ public class MultipleTableJobConfigParser {
if (!commonPluginJars.isEmpty()) {
connectorJars.addAll(commonPluginJars);
}
+ ClassLoader parentClassLoader =
Thread.currentThread().getContextClassLoader();
ClassLoader classLoader =
- new SeaTunnelChildFirstClassLoader(
- connectorJars,
Thread.currentThread().getContextClassLoader());
- Thread.currentThread().setContextClassLoader(classLoader);
-
- ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs,
sinkConfigs);
-
- this.fillJobConfig();
-
- LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap =
- new LinkedHashMap<>();
-
- log.info("start generating all sources.");
- for (int configIndex = 0; configIndex < sourceConfigs.size();
configIndex++) {
- Config sourceConfig = sourceConfigs.get(configIndex);
- Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
- parseSource(configIndex, sourceConfig, classLoader);
- tableWithActionMap.put(tuple2._1(), tuple2._2());
- }
+ new SeaTunnelChildFirstClassLoader(connectorJars,
parentClassLoader);
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs,
sinkConfigs);
+ this.fillJobConfig();
+ LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap =
+ new LinkedHashMap<>();
+
+ log.info("start generating all sources.");
+ for (int configIndex = 0; configIndex < sourceConfigs.size();
configIndex++) {
+ Config sourceConfig = sourceConfigs.get(configIndex);
+ Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
+ parseSource(configIndex, sourceConfig, classLoader);
+ tableWithActionMap.put(tuple2._1(), tuple2._2());
+ }
- log.info("start generating all transforms.");
- parseTransforms(transformConfigs, classLoader, tableWithActionMap);
+ log.info("start generating all transforms.");
+ parseTransforms(transformConfigs, classLoader, tableWithActionMap);
- log.info("start generating all sinks.");
- List<Action> sinkActions = new ArrayList<>();
- for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
- Config sinkConfig = sinkConfigs.get(configIndex);
- sinkActions.addAll(parseSink(configIndex, sinkConfig, classLoader,
tableWithActionMap));
+ log.info("start generating all sinks.");
+ List<Action> sinkActions = new ArrayList<>();
+ for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
+ Config sinkConfig = sinkConfigs.get(configIndex);
+ sinkActions.addAll(
+ parseSink(configIndex, sinkConfig, classLoader,
tableWithActionMap));
+ }
+ Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
+ return new ImmutablePair<>(sinkActions, factoryUrls);
+ } finally {
+ Thread.currentThread().setContextClassLoader(parentClassLoader);
}
- Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
- return new ImmutablePair<>(sinkActions, factoryUrls);
}
public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index ebb0edea85..970177ae90 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -27,6 +27,8 @@ import
org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
+import org.apache.hadoop.fs.FileSystem;
+
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
@@ -113,6 +115,10 @@ public class SeaTunnelServer
TimeUnit.SECONDS);
seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl)
engine).getNode());
+
+ // a trick way to fix StatisticsDataReferenceCleaner thread class
loader leak.
+ // see https://issues.apache.org/jira/browse/HADOOP-19049
+ FileSystem.Statistics statistics = new
FileSystem.Statistics("SeaTunnel");
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 482c4d6712..2c2b38d928 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
+import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
@@ -269,7 +270,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
Set<ConnectorJarIdentifier> connectorJarIdentifiers =
taskImmutableInfo.getConnectorJarIdentifiers();
Set<URL> jars = taskImmutableInfo.getJars();
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ ClassLoader classLoader;
if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
// Prioritize obtaining the jar package file required for the
current task execution
// from the local, if it does not exist locally, it will be
downloaded from the
@@ -292,6 +293,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
classLoader,
taskImmutableInfo.getGroup());
} else {
+ classLoader = new SeaTunnelChildFirstClassLoader(emptyList());
taskGroup =
nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
}
@@ -886,8 +888,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
task.getTaskID(), taskGroupLocation));
Throwable ex = executionException.get();
if (completionLatch.decrementAndGet() == 0) {
- // recycle classloader
- executionContexts.get(taskGroupLocation).setClassLoader(null);
+ recycleClassLoader(taskGroupLocation);
finishedExecutionContexts.put(
taskGroupLocation,
executionContexts.remove(taskGroupLocation));
cancellationFutures.remove(taskGroupLocation);
@@ -910,6 +911,12 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
}
+ private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
+ ClassLoader classLoader =
executionContexts.get(taskGroupLocation).getClassLoader();
+ executionContexts.get(taskGroupLocation).setClassLoader(null);
+ ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
+ }
+
boolean executionCompletedExceptionally() {
return executionException.get() != null;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 9b93bf91d5..d85c23e927 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -174,7 +174,6 @@ public class CheckpointCoordinator {
2,
runnable -> {
Thread thread = new Thread(runnable);
- thread.setDaemon(true);
thread.setName(
String.format(
"checkpoint-coordinator-%s/%s",
pipelineId, jobId));
@@ -704,7 +703,6 @@ public class CheckpointCoordinator {
2,
runnable -> {
Thread thread = new Thread(runnable);
- thread.setDaemon(true);
thread.setName(
String.format(
"checkpoint-coordinator-%s/%s", pipelineId, jobId));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d9a4a0b94a..ca75e185aa 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -108,8 +109,6 @@ public class JobMaster {
private CompletableFuture<JobResult> jobMasterCompleteFuture;
- private ClassLoader classLoader;
-
private JobImmutableInformation jobImmutableInformation;
private LogicalDag logicalDag;
@@ -128,8 +127,6 @@ public class JobMaster {
private final IMap<Object, Object> runningJobStateTimestampsIMap;
- private CompletableFuture<Void> scheduleFuture;
-
// TODO add config to change value
private boolean isPhysicalDAGIInfo = true;
@@ -204,13 +201,14 @@ public class JobMaster {
jobImmutableInformation.getJobId(),
jobImmutableInformation.getPluginJarsUrls()));
- classLoader =
+ ClassLoader classLoader =
new
SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
logicalDag =
CustomClassLoadedObject.deserializeWithCustomClassLoader(
nodeEngine.getSerializationService(),
classLoader,
jobImmutableInformation.getLogicalDag());
+ ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
PlanUtils.fromLogicalDAG(
@@ -444,10 +442,6 @@ public class JobMaster {
"can't find task group address from taskGroupLocation: " +
taskGroupLocation);
}
- public ClassLoader getClassLoader() {
- return classLoader;
- }
-
public void cancelJob() {
physicalPlan.cancelJob();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index daa3da6e09..a7e93f2551 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -51,6 +51,9 @@ public class RestConstant {
public static final String SUBMIT_JOB_URL =
"/hazelcast/rest/maps/submit-job";
public static final String ENCRYPT_CONFIG =
"/hazelcast/rest/maps/encrypt-config";
+ // only for test use
+ public static final String RUNNING_THREADS =
"/hazelcast/rest/maps/running-threads";
+
public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 6d9619abbf..af351f5278 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -67,6 +67,7 @@ import static
com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_THREADS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCommand> {
@@ -104,6 +105,8 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
handleJobInfoById(httpGetCommand, uri);
} else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
getSystemMonitoringInformation(httpGetCommand);
+ } else if (uri.startsWith(RUNNING_THREADS)) {
+ getRunningThread(httpGetCommand);
} else {
original.handle(httpGetCommand);
}
@@ -276,6 +279,24 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
}
}
+ private void getRunningThread(HttpGetCommand command) {
+ this.prepareResponse(
+ command,
+ Thread.getAllStackTraces().keySet().stream()
+ .sorted(Comparator.comparing(Thread::getName))
+ .map(
+ stackTraceElements -> {
+ JsonObject jobInfoJson = new JsonObject();
+ jobInfoJson.add("threadName",
stackTraceElements.getName());
+ jobInfoJson.add(
+ "classLoader",
+ String.valueOf(
+
stackTraceElements.getContextClassLoader()));
+ return jobInfoJson;
+ })
+ .collect(JsonArray::new, JsonArray::add,
JsonArray::add));
+ }
+
private Map<String, Long> getJobMetrics(String jobMetrics) {
Map<String, Long> metricsMap = new HashMap<>();
long sourceReadCount = 0L;