[flink-connector-pulsar] branch main updated: [FLINK-32938] Replace pulsar admin calls (#59)

2023-09-20 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
 new 78d00ea  [FLINK-32938] Replace pulsar admin calls (#59)
78d00ea is described below

commit 78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858
Author: Neng Lu 
AuthorDate: Wed Sep 20 20:14:43 2023 -0700

[FLINK-32938] Replace pulsar admin calls (#59)
---
 .../f4d91193-72ba-4ce4-ad83-98f780dce581   |  14 +-
 .../common/config/PulsarAdminProxyBuilder.java |  64 -
 .../pulsar/common/config/PulsarClientFactory.java  |  52 +---
 .../pulsar/common/config/PulsarOptions.java|  81 +-
 .../handler/PulsarAdminInvocationHandler.java  | 145 --
 .../flink/connector/pulsar/sink/PulsarSink.java|   1 -
 .../connector/pulsar/sink/PulsarSinkBuilder.java   |  13 +-
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  |   2 -
 .../sink/writer/context/PulsarSinkContext.java |   4 +-
 .../sink/writer/context/PulsarSinkContextImpl.java |   4 +-
 .../pulsar/sink/writer/topic/MetadataListener.java |  51 ++--
 .../connector/pulsar/source/PulsarSource.java  |   1 -
 .../pulsar/source/PulsarSourceBuilder.java |  11 +-
 .../source/config/PulsarSourceConfigUtils.java |   2 -
 .../source/enumerator/PulsarSourceEnumerator.java  |  18 +-
 .../source/enumerator/cursor/CursorPosition.java   |  77 +++--
 .../source/enumerator/cursor/StopCursor.java   |   4 +-
 .../cursor/start/MessageIdStartCursor.java |   2 +-
 .../cursor/stop/LatestMessageStopCursor.java   |  27 +-
 .../enumerator/subscriber/PulsarSubscriber.java|   4 +-
 .../subscriber/impl/BasePulsarSubscriber.java  |  34 +--
 .../subscriber/impl/TopicListSubscriber.java   |   5 +-
 .../source/reader/PulsarPartitionSplitReader.java  |  14 +-
 .../pulsar/source/reader/PulsarSourceReader.java   |   9 -
 .../pulsar/source/split/PulsarPartitionSplit.java  |   6 +-
 .../connector/pulsar/table/PulsarTableFactory.java |   9 +-
 .../connector/pulsar/table/PulsarTableOptions.java |  21 --
 .../pulsar/table/UpsertPulsarTableFactory.java |   9 +-
 .../handler/PulsarAdminInvocationHandlerTest.java  | 309 -
 .../pulsar/sink/PulsarSinkBuilderTest.java |   5 +-
 .../connector/pulsar/sink/PulsarSinkITCase.java|   1 -
 .../pulsar/source/PulsarSourceBuilderTest.java |   4 -
 .../source/enumerator/cursor/StopCursorTest.java   |   1 -
 .../subscriber/PulsarSubscriberTest.java   |  14 +-
 .../reader/PulsarPartitionSplitReaderTest.java |   1 -
 .../GenericRecordDeserializationSchemaTest.java|   1 -
 .../PulsarDeserializationSchemaTest.java   |   1 -
 .../pulsar/table/PulsarChangelogTableITCase.java   |   6 -
 .../pulsar/table/PulsarTableFactoryTest.java   |   9 -
 .../connector/pulsar/table/PulsarTableITCase.java  |  24 --
 .../pulsar/table/PulsarTableOptionsTest.java   |   6 +-
 .../pulsar/table/UpsertPulsarTableITCase.java  |  12 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   |   2 -
 .../testutils/sink/PulsarSinkTestContext.java  |   1 -
 .../testutils/source/PulsarSourceTestContext.java  |   1 -
 45 files changed, 131 insertions(+), 951 deletions(-)

diff --git 
a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
 
b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 1638c6a..7a71d25 100644
--- 
a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ 
b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -9,16 +9,4 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase 
does not satisfy: on
 * reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only 
one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension or 

[flink] branch release-1.17 updated: [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient

2023-09-20 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 7c9e05ea8c6 [FLINK-32974][client] Avoid creating a new temporary 
directory every time for RestClusterClient
7c9e05ea8c6 is described below

commit 7c9e05ea8c67b12c657b60cd5e6d1bea52b4f9a3
Author: Lijie Wang 
AuthorDate: Wed Sep 20 12:04:25 2023 +0800

[FLINK-32974][client] Avoid creating a new temporary directory every time 
for RestClusterClient

This closes #23363
---
 .../client/program/rest/RestClusterClient.java | 35 --
 .../client/program/rest/RestClusterClientTest.java | 31 ++-
 2 files changed, 22 insertions(+), 44 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 2ebcc14d5fe..a1ef5f7e0a9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -158,7 +158,6 @@ public class RestClusterClient implements 
ClusterClient {
 private static final Logger LOG = 
LoggerFactory.getLogger(RestClusterClient.class);
 
 private final RestClusterClientConfiguration 
restClusterClientConfiguration;
-private final java.nio.file.Path tempDir;
 
 private final Configuration configuration;
 
@@ -195,13 +194,7 @@ public class RestClusterClient implements 
ClusterClient {
 public RestClusterClient(
 Configuration config, T clusterId, 
ClientHighAvailabilityServicesFactory factory)
 throws Exception {
-this(
-config,
-null,
-clusterId,
-new ExponentialWaitStrategy(10L, 2000L),
-factory,
-Files.createTempDirectory("flink-rest-client-jobgraphs"));
+this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), 
factory);
 }
 
 @VisibleForTesting
@@ -216,24 +209,7 @@ public class RestClusterClient implements 
ClusterClient {
 restClient,
 clusterId,
 waitStrategy,
-Files.createTempDirectory("flink-rest-client-jobgraphs"));
-}
-
-@VisibleForTesting
-RestClusterClient(
-Configuration configuration,
-@Nullable RestClient restClient,
-T clusterId,
-WaitStrategy waitStrategy,
-java.nio.file.Path tmpDir)
-throws Exception {
-this(
-configuration,
-restClient,
-clusterId,
-waitStrategy,
-DefaultClientHighAvailabilityServicesFactory.INSTANCE,
-tmpDir);
+DefaultClientHighAvailabilityServicesFactory.INSTANCE);
 }
 
 private RestClusterClient(
@@ -241,14 +217,12 @@ public class RestClusterClient implements 
ClusterClient {
 @Nullable RestClient restClient,
 T clusterId,
 WaitStrategy waitStrategy,
-ClientHighAvailabilityServicesFactory clientHAServicesFactory,
-java.nio.file.Path tempDir)
+ClientHighAvailabilityServicesFactory clientHAServicesFactory)
 throws Exception {
 this.configuration = checkNotNull(configuration);
 
 this.restClusterClientConfiguration =
 
RestClusterClientConfiguration.fromConfiguration(configuration);
-this.tempDir = tempDir;
 
 if (restClient != null) {
 this.restClient = restClient;
@@ -354,7 +328,8 @@ public class RestClusterClient implements 
ClusterClient {
 () -> {
 try {
 final java.nio.file.Path jobGraphFile =
-Files.createTempFile(tempDir, 
"flink-jobgraph", ".bin");
+Files.createTempFile(
+"flink-jobgraph-" + 
jobGraph.getJobID(), ".bin");
 try (ObjectOutputStream objectOut =
 new ObjectOutputStream(
 
Files.newOutputStream(jobGraphFile))) {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ff6dea2a98..7b44c6f469f 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ 

[flink] branch release-1.18 updated: [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient

2023-09-20 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 2aeb99804ba [FLINK-32974][client] Avoid creating a new temporary 
directory every time for RestClusterClient
2aeb99804ba is described below

commit 2aeb99804ba56c008df0a1730f3246d3fea856b9
Author: Lijie Wang 
AuthorDate: Wed Sep 20 12:04:25 2023 +0800

[FLINK-32974][client] Avoid creating a new temporary directory every time 
for RestClusterClient

This closes #23363
---
 .../client/program/rest/RestClusterClient.java | 35 --
 .../client/program/rest/RestClusterClientTest.java | 31 ++-
 2 files changed, 22 insertions(+), 44 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 66b918180a6..4318d0a6c42 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -163,7 +163,6 @@ public class RestClusterClient implements 
ClusterClient {
 private static final Logger LOG = 
LoggerFactory.getLogger(RestClusterClient.class);
 
 private final RestClusterClientConfiguration 
restClusterClientConfiguration;
-private final java.nio.file.Path tempDir;
 
 private final Configuration configuration;
 
@@ -200,13 +199,7 @@ public class RestClusterClient implements 
ClusterClient {
 public RestClusterClient(
 Configuration config, T clusterId, 
ClientHighAvailabilityServicesFactory factory)
 throws Exception {
-this(
-config,
-null,
-clusterId,
-new ExponentialWaitStrategy(10L, 2000L),
-factory,
-Files.createTempDirectory("flink-rest-client-jobgraphs"));
+this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), 
factory);
 }
 
 @VisibleForTesting
@@ -221,24 +214,7 @@ public class RestClusterClient implements 
ClusterClient {
 restClient,
 clusterId,
 waitStrategy,
-Files.createTempDirectory("flink-rest-client-jobgraphs"));
-}
-
-@VisibleForTesting
-RestClusterClient(
-Configuration configuration,
-@Nullable RestClient restClient,
-T clusterId,
-WaitStrategy waitStrategy,
-java.nio.file.Path tmpDir)
-throws Exception {
-this(
-configuration,
-restClient,
-clusterId,
-waitStrategy,
-DefaultClientHighAvailabilityServicesFactory.INSTANCE,
-tmpDir);
+DefaultClientHighAvailabilityServicesFactory.INSTANCE);
 }
 
 private RestClusterClient(
@@ -246,14 +222,12 @@ public class RestClusterClient implements 
ClusterClient {
 @Nullable RestClient restClient,
 T clusterId,
 WaitStrategy waitStrategy,
-ClientHighAvailabilityServicesFactory clientHAServicesFactory,
-java.nio.file.Path tempDir)
+ClientHighAvailabilityServicesFactory clientHAServicesFactory)
 throws Exception {
 this.configuration = checkNotNull(configuration);
 
 this.restClusterClientConfiguration =
 
RestClusterClientConfiguration.fromConfiguration(configuration);
-this.tempDir = tempDir;
 
 if (restClient != null) {
 this.restClient = restClient;
@@ -359,7 +333,8 @@ public class RestClusterClient implements 
ClusterClient {
 () -> {
 try {
 final java.nio.file.Path jobGraphFile =
-Files.createTempFile(tempDir, 
"flink-jobgraph", ".bin");
+Files.createTempFile(
+"flink-jobgraph-" + 
jobGraph.getJobID(), ".bin");
 try (ObjectOutputStream objectOut =
 new ObjectOutputStream(
 
Files.newOutputStream(jobGraphFile))) {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 740eb06a57b..e44174beb78 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ 

[flink] 02/02: [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage

2023-09-20 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7997cf152cc59b45c51977b825c379c0421b575e
Author: Yuxin Tan 
AuthorDate: Fri Sep 15 10:32:34 2023 +0800

[FLINK-33044][network] Reduce the frequency of triggering flush for the 
disk tier of tiered storage
---
 .../tiered/common/TieredStorageConfiguration.java  | 10 ++
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  | 26 +++
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  5 +++
 .../tiered/tier/disk/DiskTierProducerAgent.java|  2 ++
 .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++
 .../tier/disk/DiskTierProducerAgentTest.java   |  1 +
 6 files changed, 75 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index d031d7278c4..abc6c955d53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -67,6 +67,8 @@ public class TieredStorageConfiguration {
 
 private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 
* 1024L;
 
+private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 
1024;
+
 private final String remoteStorageBasePath;
 
 private final int tieredStorageBufferSize;
@@ -330,6 +332,8 @@ public class TieredStorageConfiguration {
 
 private long numRetainedInMemoryRegionsMax = 
DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY;
 
+private int maxCachedBytesBeforeFlush = 
DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH;
+
 private List tierFactories;
 
 private List tierExclusiveBuffers;
@@ -416,6 +420,11 @@ public class TieredStorageConfiguration {
 return this;
 }
 
+public Builder setMaxCachedBytesBeforeFlush(int 
maxCachedBytesBeforeFlush) {
+this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
+return this;
+}
+
 public TieredStorageConfiguration build() {
 setupTierFactoriesAndExclusiveBuffers();
 return new TieredStorageConfiguration(
@@ -451,6 +460,7 @@ public class TieredStorageConfiguration {
 tieredStorageBufferSize,
 minReserveDiskSpaceFraction,
 regionGroupSizeInBytes,
+maxCachedBytesBeforeFlush,
 numRetainedInMemoryRegionsMax));
 tierExclusiveBuffers.add(diskTierExclusiveBuffers);
 if (remoteStorageBasePath != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
index f5d7f30e0c0..feb3a1eb1d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
@@ -41,6 +41,8 @@ class DiskCacheManager {
 
 private final int numSubpartitions;
 
+private final int maxCachedBytesBeforeFlush;
+
 private final PartitionFileWriter partitionFileWriter;
 
 private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
@@ -48,13 +50,21 @@ class DiskCacheManager {
 /** Whether the current flush process has completed. */
 private CompletableFuture hasFlushCompleted;
 
+/**
+ * The number of all subpartition's cached bytes in the cache manager. 
Note that the counter can
+ * only be accessed by the task thread and does not require locks.
+ */
+private int numCachedBytesCounter;
+
 DiskCacheManager(
 TieredStoragePartitionId partitionId,
 int numSubpartitions,
+int maxCachedBytesBeforeFlush,
 TieredStorageMemoryManager memoryManager,
 PartitionFileWriter partitionFileWriter) {
 this.partitionId = partitionId;
 this.numSubpartitions = numSubpartitions;
+this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
 this.partitionFileWriter = partitionFileWriter;
 this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
 this.hasFlushCompleted = FutureUtils.completedVoidFuture();
@@ -81,6 +91,7 @@ class DiskCacheManager {
  */
 void append(Buffer buffer, int subpartitionId) {
   

[flink] 01/02: [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage

2023-09-20 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 59368aedad90cda46883ed0bd0b8fa44aa75c0fe
Author: Yuxin Tan 
AuthorDate: Fri Sep 15 10:31:51 2023 +0800

[hotfix][network] Flush writing buffers when closing 
HashSubpartitionBufferAccumulator of tiered storage
---
 .../hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
index e610926a55d..4a5e9af3ea0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
@@ -75,6 +75,7 @@ public class HashSubpartitionBufferAccumulator {
 }
 
 public void close() {
+finishCurrentWritingBufferIfNotEmpty();
 while (!unfinishedBuffers.isEmpty()) {
 unfinishedBuffers.poll().close();
 }



[flink] branch release-1.18 updated (9c1318ca7fa -> 7997cf152cc)

2023-09-20 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


from 9c1318ca7fa [FLINK-15736][docs] Add Java compatibility page
 new 59368aedad9 [hotfix][network] Flush writing buffers when closing 
HashSubpartitionBufferAccumulator of tiered storage
 new 7997cf152cc [FLINK-33044][network] Reduce the frequency of triggering 
flush for the disk tier of tiered storage

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tiered/common/TieredStorageConfiguration.java  | 10 ++
 .../storage/HashSubpartitionBufferAccumulator.java |  1 +
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  | 26 +++
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  5 +++
 .../tiered/tier/disk/DiskTierProducerAgent.java|  2 ++
 .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++
 .../tier/disk/DiskTierProducerAgentTest.java   |  1 +
 7 files changed, 76 insertions(+), 6 deletions(-)



[flink] 02/02: [FLINK-33050][docs] Add notice of data duplicates for RTAS in docs This closes #23389

2023-09-20 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5e3edb98f1f485fdafd8ce1fd88a930af7cf829
Author: Tartarus0zm 
AuthorDate: Wed Sep 20 11:55:54 2023 +0800

[FLINK-33050][docs] Add notice of data duplicates for RTAS in docs
This closes #23389
---
 docs/content.zh/docs/dev/table/sql/create.md | 1 +
 docs/content/docs/dev/table/sql/create.md| 1 +
 2 files changed, 2 insertions(+)

diff --git a/docs/content.zh/docs/dev/table/sql/create.md 
b/docs/content.zh/docs/dev/table/sql/create.md
index 7024b457585..f6c8674b6c8 100644
--- a/docs/content.zh/docs/dev/table/sql/create.md
+++ b/docs/content.zh/docs/dev/table/sql/create.md
@@ -615,6 +615,7 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM 
source_table WHERE mod(id, 1
 * 暂不支持主键约束。
 
 **注意:** 默认情况下,RTAS 是非原子性的,这意味着如果在向表中插入数据时发生错误,该表不会被自动删除或还原成原来的表。
+**注意:** RTAS 会先删除表,然后创建表并写入数据。但如果表是在基于内存的 Catalog 里,删除表只会将其从 Catalog 
里移除,并不会移除物理表中的数据。因此,执行RTAS语句之前的数据仍然存在。
 
 ### 原子性
 
diff --git a/docs/content/docs/dev/table/sql/create.md 
b/docs/content/docs/dev/table/sql/create.md
index 8dc17cd525d..3d94b111c53 100644
--- a/docs/content/docs/dev/table/sql/create.md
+++ b/docs/content/docs/dev/table/sql/create.md
@@ -616,6 +616,7 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM 
source_table WHERE mod(id, 1
 * Does not support specifying primary key constraints yet.
 
 **Note:** By default, RTAS is non-atomic which means the table won't be 
dropped or restored to its origin automatically if occur errors while inserting 
data into the table.
+**Note:** RTAS will drop the table first, then create the table and insert the 
data. But if the table is in the in-memory catalog, dropping table will only 
remove it from the catalog without removing the data in the physical table. So, 
the data before executing RTAS statement will still exist.
 
 ### Atomicity
 



[flink] 01/02: [FLINK-33050][table] Atomicity is not supported prompting the user to disable

2023-09-20 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e97ae3549f77578b36ee9d6da97d98fc6e76fa4
Author: Tartarus0zm 
AuthorDate: Wed Sep 20 11:55:39 2023 +0800

[FLINK-33050][table] Atomicity is not supported prompting the user to 
disable
---
 .../table/api/internal/TableEnvironmentImpl.java   | 36 ++
 1 file changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1b9648f6cb6..d25df607fd1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -143,6 +143,7 @@ import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
  */
 @Internal
 public class TableEnvironmentImpl implements TableEnvironmentInternal {
+
 // Flag that tells if the TableSource/TableSink used in this environment 
is stream table
 // source/sink,
 // and this should always be true. This avoids too many hard code.
@@ -951,18 +952,29 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
 createTableOperation.getTableIdentifier(),
 catalogTable,
 createTableOperation.isTemporary())) {
-DynamicTableSink dynamicTableSink =
-ExecutableOperationUtils.createDynamicTableSink(
-catalog,
-() -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
-createTableOperation.getTableIdentifier(),
-catalogTable,
-Collections.emptyMap(),
-tableConfig,
-resourceManager.getUserClassLoader(),
-createTableOperation.isTemporary());
-if (dynamicTableSink instanceof SupportsStaging) {
-return Optional.of(dynamicTableSink);
+try {
+DynamicTableSink dynamicTableSink =
+ExecutableOperationUtils.createDynamicTableSink(
+catalog,
+() -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+createTableOperation.getTableIdentifier(),
+catalogTable,
+Collections.emptyMap(),
+tableConfig,
+resourceManager.getUserClassLoader(),
+createTableOperation.isTemporary());
+if (dynamicTableSink instanceof SupportsStaging) {
+return Optional.of(dynamicTableSink);
+}
+} catch (Exception e) {
+throw new TableException(
+String.format(
+"Fail to create DynamicTableSink for the 
table %s, "
++ "maybe the table does not 
support atomicity of CTAS/RTAS, "
++ "please set %s to false and try 
again.",
+createTableOperation.getTableIdentifier(),
+
TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key()),
+e);
 }
 }
 }



[flink] branch master updated (f2cb1d24728 -> d5e3edb98f1)

2023-09-20 Thread yuxia
This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from f2cb1d24728 [FLINK-32848][tests][JUnit5 migration] Migrate 
flink-runtime/rpc tests to JUnit5 (#23301)
 new 0e97ae3549f [FLINK-33050][table] Atomicity is not supported prompting 
the user to disable
 new d5e3edb98f1 [FLINK-33050][docs] Add notice of data duplicates for RTAS 
in docs This closes #23389

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/dev/table/sql/create.md   |  1 +
 docs/content/docs/dev/table/sql/create.md  |  1 +
 .../table/api/internal/TableEnvironmentImpl.java   | 36 ++
 3 files changed, 26 insertions(+), 12 deletions(-)



[flink] branch master updated: [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to JUnit5 (#23301)

2023-09-20 Thread zjureel
This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f2cb1d24728 [FLINK-32848][tests][JUnit5 migration] Migrate 
flink-runtime/rpc tests to JUnit5 (#23301)
f2cb1d24728 is described below

commit f2cb1d247283344e9194e63931a2948e09f73c93
Author: Zhanghao Chen 
AuthorDate: Thu Sep 21 08:45:59 2023 +0800

[FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to 
JUnit5 (#23301)

* [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to 
JUnit5
---
 .../apache/flink/runtime/rpc/AsyncCallsTest.java   |  44 
 .../flink/runtime/rpc/FencedRpcEndpointTest.java   |  49 -
 .../flink/runtime/rpc/RpcConnectionTest.java   |   7 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 112 +
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |  29 +++---
 5 files changed, 109 insertions(+), 132 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 0ffad0f098e..4be039e1340 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -20,11 +20,10 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.UUID;
@@ -35,11 +34,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class AsyncCallsTest extends TestLogger {
+class AsyncCallsTest {
 
 // 
 //  shared test members
@@ -49,14 +46,13 @@ public class AsyncCallsTest extends TestLogger {
 
 private static RpcService rpcService;
 
-@BeforeClass
-public static void setup() throws Exception {
+@BeforeAll
+static void setup() throws Exception {
 rpcService = RpcSystem.load().localServiceBuilder(new 
Configuration()).createAndStart();
 }
 
-@AfterClass
-public static void shutdown()
-throws InterruptedException, ExecutionException, TimeoutException {
+@AfterAll
+static void shutdown() throws InterruptedException, ExecutionException, 
TimeoutException {
 rpcService.closeAsync().get();
 }
 
@@ -65,12 +61,12 @@ public class AsyncCallsTest extends TestLogger {
 // 
 
 @Test
-public void testScheduleWithNoDelay() throws Exception {
+void testScheduleWithNoDelay() throws Exception {
 runScheduleWithNoDelayTest(TestEndpoint::new);
 }
 
 @Test
-public void testFencedScheduleWithNoDelay() throws Exception {
+void testFencedScheduleWithNoDelay() throws Exception {
 runScheduleWithNoDelayTest(FencedTestEndpoint::new);
 }
 
@@ -117,22 +113,24 @@ public class AsyncCallsTest extends TestLogger {
 Duration.ofSeconds(30L));
 
 String str = result.get(30, TimeUnit.SECONDS);
-assertEquals("test", str);
+assertThat(str).isEqualTo("test");
 
 // validate that no concurrent access happened
-assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+assertThat(concurrentAccess)
+.withFailMessage("Rpc Endpoint had concurrent access")
+.isFalse();
 } finally {
 RpcUtils.terminateRpcEndpoint(rpcEndpoint);
 }
 }
 
 @Test
-public void testScheduleWithDelay() throws Exception {
+void testScheduleWithDelay() throws Exception {
 runScheduleWithDelayTest(TestEndpoint::new);
 }
 
 @Test
-public void testFencedScheduleWithDelay() throws Exception {
+void testFencedScheduleWithDelay() throws Exception {
 runScheduleWithDelayTest(FencedTestEndpoint::new);
 }
 
@@ -178,9 +176,13 @@ public class AsyncCallsTest extends TestLogger {
 final long stop = System.nanoTime();
 
 // validate that no concurrent access happened
-assertFalse("Rpc Endpoint had concurrent 

[flink] branch release-1.18 updated: [FLINK-15736][docs] Add Java compatibility page

2023-09-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 9c1318ca7fa [FLINK-15736][docs] Add Java compatibility page
9c1318ca7fa is described below

commit 9c1318ca7fa5b2e7b11827068ad1288483aaa464
Author: Chesnay Schepler 
AuthorDate: Wed Sep 20 22:07:04 2023 +0200

[FLINK-15736][docs] Add Java compatibility page
---
 .../docs/deployment/java_compatibility.md  | 77 ++
 docs/content/docs/deployment/java_compatibility.md | 77 ++
 docs/content/docs/deployment/memory/_index.md  |  2 +-
 3 files changed, 155 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/deployment/java_compatibility.md 
b/docs/content.zh/docs/deployment/java_compatibility.md
new file mode 100644
index 000..f2e4cb93eb4
--- /dev/null
+++ b/docs/content.zh/docs/deployment/java_compatibility.md
@@ -0,0 +1,77 @@
+---
+title: Java Compatibility
+weight: 2
+type: docs
+---
+
+
+# Java compatibility
+
+This page lists which Java versions Flink supports and what limitations apply 
(if any).
+
+## Java 8 (deprecated)
+
+Support for Java 8 has been deprecated in 1.15.0.
+It is recommended to migrate to Java 11.
+
+## Java 11
+
+Support for Java 11 was added in 1.10.0 and is the recommended Java version to 
run Flink on.
+
+This is the default version for docker images.
+
+### Untested Flink features
+
+The following Flink features have not been tested with Java 11:
+
+* Hive connector
+* Hbase 1.x connector
+
+### Untested language features
+
+* Modularized user jars have not been tested.
+
+## Java 17
+
+Experimental support for Java 17 was added in 1.18.0. 
([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736))
+
+### Untested Flink features
+
+These Flink features have not been tested with Java 17:
+
+* Hive connector
+* Hbase 1.x connector
+
+### JDK modularization
+
+Starting with Java 16 Java applications have to fully cooperate with the JDK 
modularization, also known as [Project 
Jigsaw](https://openjdk.org/projects/jigsaw/).
+This means that access to JDK classes/internal must be explicitly allowed by 
the application when it is started, on a per-module basis, in the form of 
--add-opens/--add-exports JVM arguments.
+
+Since Flink uses reflection for serializing user-defined functions and data 
(via Kryo), this means that if your UDFs or data types use JDK classes you may 
have to allow access to these JDK classes.
+
+These should be configured via the [env.java.opts.all]({{< ref 
"docs/deployment/config" >}}#env-java-opts-all) option.
+
+In the default configuration in the Flink distribution this option is 
configured such that Flink itself works on Java 17.  
+The list of configured arguments must not be shortened, but only extended.
+
+### Known issues
+
+* Java records are not supported. See 
[FLINK-32380](https://issues.apache.org/jira/browse/FLINK-32380) for updates.
+* SIGSEGV in C2 Compiler thread: Early Java 17 builds are affected by a bug 
where the JVM can fail abruptly. Update your Java 17 installation to resolve 
the issue. See [JDK-8277529](https://bugs.openjdk.org/browse/JDK-8277529) for 
details.
diff --git a/docs/content/docs/deployment/java_compatibility.md 
b/docs/content/docs/deployment/java_compatibility.md
new file mode 100644
index 000..f2e4cb93eb4
--- /dev/null
+++ b/docs/content/docs/deployment/java_compatibility.md
@@ -0,0 +1,77 @@
+---
+title: Java Compatibility
+weight: 2
+type: docs
+---
+
+
+# Java compatibility
+
+This page lists which Java versions Flink supports and what limitations apply 
(if any).
+
+## Java 8 (deprecated)
+
+Support for Java 8 has been deprecated in 1.15.0.
+It is recommended to migrate to Java 11.
+
+## Java 11
+
+Support for Java 11 was added in 1.10.0 and is the recommended Java version to 
run Flink on.
+
+This is the default version for docker images.
+
+### Untested Flink features
+
+The following Flink features have not been tested with Java 11:
+
+* Hive connector
+* Hbase 1.x connector
+
+### Untested language features
+
+* Modularized user jars have not been tested.
+
+## Java 17
+
+Experimental support for Java 17 was added in 1.18.0. 
([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736))
+
+### Untested Flink features
+
+These Flink features have not been tested with Java 17:
+
+* Hive connector
+* Hbase 1.x connector
+
+### JDK modularization
+
+Starting with Java 16 Java applications have to fully cooperate with the JDK 
modularization, also known as [Project 
Jigsaw](https://openjdk.org/projects/jigsaw/).
+This means that access to JDK classes/internal must be explicitly allowed by 
the application when it is started, on a per-module basis, in the form of 
--add-opens/--add-exports JVM arguments.
+
+Since Flink uses reflection for serializing user-defined 

[flink] branch master updated: [FLINK-15736][docs] Add Java compatibility page

2023-09-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 388601f1b75 [FLINK-15736][docs] Add Java compatibility page
388601f1b75 is described below

commit 388601f1b75abd443e149aaa4584d366072a1b0e
Author: Chesnay Schepler 
AuthorDate: Wed Sep 20 22:07:04 2023 +0200

[FLINK-15736][docs] Add Java compatibility page
---
 .../docs/deployment/java_compatibility.md  | 77 ++
 docs/content/docs/deployment/java_compatibility.md | 77 ++
 docs/content/docs/deployment/memory/_index.md  |  2 +-
 3 files changed, 155 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/deployment/java_compatibility.md 
b/docs/content.zh/docs/deployment/java_compatibility.md
new file mode 100644
index 000..f2e4cb93eb4
--- /dev/null
+++ b/docs/content.zh/docs/deployment/java_compatibility.md
@@ -0,0 +1,77 @@
+---
+title: Java Compatibility
+weight: 2
+type: docs
+---
+
+
+# Java compatibility
+
+This page lists which Java versions Flink supports and what limitations apply 
(if any).
+
+## Java 8 (deprecated)
+
+Support for Java 8 has been deprecated in 1.15.0.
+It is recommended to migrate to Java 11.
+
+## Java 11
+
+Support for Java 11 was added in 1.10.0 and is the recommended Java version to 
run Flink on.
+
+This is the default version for docker images.
+
+### Untested Flink features
+
+The following Flink features have not been tested with Java 11:
+
+* Hive connector
+* Hbase 1.x connector
+
+### Untested language features
+
+* Modularized user jars have not been tested.
+
+## Java 17
+
+Experimental support for Java 17 was added in 1.18.0. 
([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736))
+
+### Untested Flink features
+
+These Flink features have not been tested with Java 17:
+
+* Hive connector
+* Hbase 1.x connector
+
+### JDK modularization
+
+Starting with Java 16 Java applications have to fully cooperate with the JDK 
modularization, also known as [Project 
Jigsaw](https://openjdk.org/projects/jigsaw/).
+This means that access to JDK classes/internal must be explicitly allowed by 
the application when it is started, on a per-module basis, in the form of 
--add-opens/--add-exports JVM arguments.
+
+Since Flink uses reflection for serializing user-defined functions and data 
(via Kryo), this means that if your UDFs or data types use JDK classes you may 
have to allow access to these JDK classes.
+
+These should be configured via the [env.java.opts.all]({{< ref 
"docs/deployment/config" >}}#env-java-opts-all) option.
+
+In the default configuration in the Flink distribution this option is 
configured such that Flink itself works on Java 17.  
+The list of configured arguments must not be shortened, but only extended.
+
+### Known issues
+
+* Java records are not supported. See 
[FLINK-32380](https://issues.apache.org/jira/browse/FLINK-32380) for updates.
+* SIGSEGV in C2 Compiler thread: Early Java 17 builds are affected by a bug 
where the JVM can fail abruptly. Update your Java 17 installation to resolve 
the issue. See [JDK-8277529](https://bugs.openjdk.org/browse/JDK-8277529) for 
details.
diff --git a/docs/content/docs/deployment/java_compatibility.md 
b/docs/content/docs/deployment/java_compatibility.md
new file mode 100644
index 000..f2e4cb93eb4
--- /dev/null
+++ b/docs/content/docs/deployment/java_compatibility.md
@@ -0,0 +1,77 @@
+---
+title: Java Compatibility
+weight: 2
+type: docs
+---
+
+
+# Java compatibility
+
+This page lists which Java versions Flink supports and what limitations apply 
(if any).
+
+## Java 8 (deprecated)
+
+Support for Java 8 has been deprecated in 1.15.0.
+It is recommended to migrate to Java 11.
+
+## Java 11
+
+Support for Java 11 was added in 1.10.0 and is the recommended Java version to 
run Flink on.
+
+This is the default version for docker images.
+
+### Untested Flink features
+
+The following Flink features have not been tested with Java 11:
+
+* Hive connector
+* Hbase 1.x connector
+
+### Untested language features
+
+* Modularized user jars have not been tested.
+
+## Java 17
+
+Experimental support for Java 17 was added in 1.18.0. 
([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736))
+
+### Untested Flink features
+
+These Flink features have not been tested with Java 17:
+
+* Hive connector
+* Hbase 1.x connector
+
+### JDK modularization
+
+Starting with Java 16 Java applications have to fully cooperate with the JDK 
modularization, also known as [Project 
Jigsaw](https://openjdk.org/projects/jigsaw/).
+This means that access to JDK classes/internal must be explicitly allowed by 
the application when it is started, on a per-module basis, in the form of 
--add-opens/--add-exports JVM arguments.
+
+Since Flink uses reflection for serializing user-defined functions and data 

[flink] branch master updated (5e3abe28f62 -> 5a8321f6385)

2023-09-20 Thread huweihua
This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 5e3abe28f62 [FLINK-33024][table-planner][JUnit5 Migration] Module: 
flink-table-planner (JsonPlanTestBase) (#23353)
 add db53ca19361 [hotfix][docs] Update the parameter type of 
JobVertex#finalizeOnMaster in JavaDocs
 add 5a8321f6385 [FLINK-32846][runtime][JUnit5 Migration] The metrics 
package of flink-runtime module

No new revisions were added by this update.

Summary of changes:
 .../executiongraph/ExecutionGraphTestUtils.java|   2 +-
 .../executiongraph/FinalizeOnMasterTest.java   |   4 +-
 .../DescriptiveStatisticsHistogramTest.java|   7 +-
 .../runtime/metrics/MetricRegistryImplTest.java|   3 -
 .../flink/runtime/metrics/ReporterSetupTest.java   |  89 -
 .../flink/runtime/metrics/ThresholdMeterTest.java  |  60 +++---
 .../flink/runtime/metrics/TimerGaugeTest.java  |  65 +++
 .../metrics/dump/MetricDumpSerializerTest.java |   2 +-
 .../flink/runtime/metrics/dump/MetricDumpTest.java |  67 +++
 .../metrics/dump/MetricQueryServiceTest.java   |  78 
 .../metrics/filter/DefaultMetricFilterTest.java|  68 +++
 .../metrics/groups/AbstractMetricGroupTest.java| 132 ++---
 .../metrics/groups/FrontMetricGroupTest.java   |  38 ++--
 .../metrics/groups/InternalOperatorGroupTest.java  |  88 -
 .../metrics/groups/JobManagerGroupTest.java|  43 ++---
 .../metrics/groups/JobManagerJobGroupTest.java |  41 ++--
 .../groups/JobManagerOperatorGroupTest.java|   6 +-
 .../groups/MetricGroupRegistrationTest.java|  26 ++-
 .../runtime/metrics/groups/MetricGroupTest.java| 213 -
 .../metrics/groups/TaskIOMetricGroupTest.java  |   2 +-
 .../metrics/groups/TaskManagerGroupTest.java   |  80 
 .../metrics/groups/TaskManagerJobGroupTest.java|  57 +++---
 .../metrics/groups/TaskManagerMetricGroupTest.java |  36 ++--
 .../metrics/groups/TaskMetricGroupTest.java| 105 +-
 .../runtime/metrics/util/MetricUtilsTest.java  |  80 
 .../metrics/utils/SystemResourcesCounterTest.java  |  23 +--
 26 files changed, 680 insertions(+), 735 deletions(-)



[flink] 01/02: [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage

2023-09-20 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 887c66a93cf3db5c6be0461f4ce4b1be26b6c5c6
Author: Yuxin Tan 
AuthorDate: Fri Sep 15 10:31:51 2023 +0800

[hotfix][network] Flush writing buffers when closing 
HashSubpartitionBufferAccumulator of tiered storage
---
 .../hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
index e610926a55d..4a5e9af3ea0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java
@@ -75,6 +75,7 @@ public class HashSubpartitionBufferAccumulator {
 }
 
 public void close() {
+finishCurrentWritingBufferIfNotEmpty();
 while (!unfinishedBuffers.isEmpty()) {
 unfinishedBuffers.poll().close();
 }



[flink] branch master updated (5a8321f6385 -> 6034d5ff335)

2023-09-20 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 5a8321f6385 [FLINK-32846][runtime][JUnit5 Migration] The metrics 
package of flink-runtime module
 add 6034d5ff335 [FLINK-32974][client] Avoid creating a new temporary 
directory every time for RestClusterClient

No new revisions were added by this update.

Summary of changes:
 .../client/program/rest/RestClusterClient.java | 35 --
 .../client/program/rest/RestClusterClientTest.java | 31 ++-
 2 files changed, 22 insertions(+), 44 deletions(-)



[flink] 02/02: [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage

2023-09-20 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b076c52d8da914e81c3e004c0b0c7883463bb151
Author: Yuxin Tan 
AuthorDate: Fri Sep 15 10:32:34 2023 +0800

[FLINK-33044][network] Reduce the frequency of triggering flush for the 
disk tier of tiered storage
---
 .../tiered/common/TieredStorageConfiguration.java  | 10 ++
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  | 26 +++
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  5 +++
 .../tiered/tier/disk/DiskTierProducerAgent.java|  2 ++
 .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++
 .../tier/disk/DiskTierProducerAgentTest.java   |  1 +
 6 files changed, 75 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index d031d7278c4..abc6c955d53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -67,6 +67,8 @@ public class TieredStorageConfiguration {
 
 private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 
* 1024L;
 
+private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 
1024;
+
 private final String remoteStorageBasePath;
 
 private final int tieredStorageBufferSize;
@@ -330,6 +332,8 @@ public class TieredStorageConfiguration {
 
 private long numRetainedInMemoryRegionsMax = 
DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY;
 
+private int maxCachedBytesBeforeFlush = 
DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH;
+
 private List tierFactories;
 
 private List tierExclusiveBuffers;
@@ -416,6 +420,11 @@ public class TieredStorageConfiguration {
 return this;
 }
 
+public Builder setMaxCachedBytesBeforeFlush(int 
maxCachedBytesBeforeFlush) {
+this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
+return this;
+}
+
 public TieredStorageConfiguration build() {
 setupTierFactoriesAndExclusiveBuffers();
 return new TieredStorageConfiguration(
@@ -451,6 +460,7 @@ public class TieredStorageConfiguration {
 tieredStorageBufferSize,
 minReserveDiskSpaceFraction,
 regionGroupSizeInBytes,
+maxCachedBytesBeforeFlush,
 numRetainedInMemoryRegionsMax));
 tierExclusiveBuffers.add(diskTierExclusiveBuffers);
 if (remoteStorageBasePath != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
index f5d7f30e0c0..feb3a1eb1d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
@@ -41,6 +41,8 @@ class DiskCacheManager {
 
 private final int numSubpartitions;
 
+private final int maxCachedBytesBeforeFlush;
+
 private final PartitionFileWriter partitionFileWriter;
 
 private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
@@ -48,13 +50,21 @@ class DiskCacheManager {
 /** Whether the current flush process has completed. */
 private CompletableFuture hasFlushCompleted;
 
+/**
+ * The number of all subpartition's cached bytes in the cache manager. 
Note that the counter can
+ * only be accessed by the task thread and does not require locks.
+ */
+private int numCachedBytesCounter;
+
 DiskCacheManager(
 TieredStoragePartitionId partitionId,
 int numSubpartitions,
+int maxCachedBytesBeforeFlush,
 TieredStorageMemoryManager memoryManager,
 PartitionFileWriter partitionFileWriter) {
 this.partitionId = partitionId;
 this.numSubpartitions = numSubpartitions;
+this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
 this.partitionFileWriter = partitionFileWriter;
 this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
 this.hasFlushCompleted = FutureUtils.completedVoidFuture();
@@ -81,6 +91,7 @@ class DiskCacheManager {
  */
 void append(Buffer buffer, int subpartitionId) {
 

[flink] branch master updated (6034d5ff335 -> b076c52d8da)

2023-09-20 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 6034d5ff335 [FLINK-32974][client] Avoid creating a new temporary 
directory every time for RestClusterClient
 new 887c66a93cf [hotfix][network] Flush writing buffers when closing 
HashSubpartitionBufferAccumulator of tiered storage
 new b076c52d8da [FLINK-33044][network] Reduce the frequency of triggering 
flush for the disk tier of tiered storage

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tiered/common/TieredStorageConfiguration.java  | 10 ++
 .../storage/HashSubpartitionBufferAccumulator.java |  1 +
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  | 26 +++
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  5 +++
 .../tiered/tier/disk/DiskTierProducerAgent.java|  2 ++
 .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++
 .../tier/disk/DiskTierProducerAgentTest.java   |  1 +
 7 files changed, 76 insertions(+), 6 deletions(-)