[flink-connector-pulsar] branch main updated: [FLINK-32938] Replace pulsar admin calls (#59)
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/main by this push: new 78d00ea [FLINK-32938] Replace pulsar admin calls (#59) 78d00ea is described below commit 78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858 Author: Neng Lu AuthorDate: Wed Sep 20 20:14:43 2023 -0700 [FLINK-32938] Replace pulsar admin calls (#59) --- .../f4d91193-72ba-4ce4-ad83-98f780dce581 | 14 +- .../common/config/PulsarAdminProxyBuilder.java | 64 - .../pulsar/common/config/PulsarClientFactory.java | 52 +--- .../pulsar/common/config/PulsarOptions.java| 81 +- .../handler/PulsarAdminInvocationHandler.java | 145 -- .../flink/connector/pulsar/sink/PulsarSink.java| 1 - .../connector/pulsar/sink/PulsarSinkBuilder.java | 13 +- .../pulsar/sink/config/PulsarSinkConfigUtils.java | 2 - .../sink/writer/context/PulsarSinkContext.java | 4 +- .../sink/writer/context/PulsarSinkContextImpl.java | 4 +- .../pulsar/sink/writer/topic/MetadataListener.java | 51 ++-- .../connector/pulsar/source/PulsarSource.java | 1 - .../pulsar/source/PulsarSourceBuilder.java | 11 +- .../source/config/PulsarSourceConfigUtils.java | 2 - .../source/enumerator/PulsarSourceEnumerator.java | 18 +- .../source/enumerator/cursor/CursorPosition.java | 77 +++-- .../source/enumerator/cursor/StopCursor.java | 4 +- .../cursor/start/MessageIdStartCursor.java | 2 +- .../cursor/stop/LatestMessageStopCursor.java | 27 +- .../enumerator/subscriber/PulsarSubscriber.java| 4 +- .../subscriber/impl/BasePulsarSubscriber.java | 34 +-- .../subscriber/impl/TopicListSubscriber.java | 5 +- .../source/reader/PulsarPartitionSplitReader.java | 14 +- .../pulsar/source/reader/PulsarSourceReader.java | 9 - .../pulsar/source/split/PulsarPartitionSplit.java | 6 +- .../connector/pulsar/table/PulsarTableFactory.java | 9 +- .../connector/pulsar/table/PulsarTableOptions.java | 21 -- .../pulsar/table/UpsertPulsarTableFactory.java | 9 +- .../handler/PulsarAdminInvocationHandlerTest.java | 309 - .../pulsar/sink/PulsarSinkBuilderTest.java | 5 +- .../connector/pulsar/sink/PulsarSinkITCase.java| 1 - .../pulsar/source/PulsarSourceBuilderTest.java | 4 - .../source/enumerator/cursor/StopCursorTest.java | 1 - .../subscriber/PulsarSubscriberTest.java | 14 +- .../reader/PulsarPartitionSplitReaderTest.java | 1 - .../GenericRecordDeserializationSchemaTest.java| 1 - .../PulsarDeserializationSchemaTest.java | 1 - .../pulsar/table/PulsarChangelogTableITCase.java | 6 - .../pulsar/table/PulsarTableFactoryTest.java | 9 - .../connector/pulsar/table/PulsarTableITCase.java | 24 -- .../pulsar/table/PulsarTableOptionsTest.java | 6 +- .../pulsar/table/UpsertPulsarTableITCase.java | 12 +- .../testutils/runtime/PulsarRuntimeOperator.java | 2 - .../testutils/sink/PulsarSinkTestContext.java | 1 - .../testutils/source/PulsarSourceTestContext.java | 1 - 45 files changed, 131 insertions(+), 951 deletions(-) diff --git a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 index 1638c6a..7a71d25 100644 --- a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 +++ b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 @@ -9,16 +9,4 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule -org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\ -* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or
[flink] branch release-1.17 updated: [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 7c9e05ea8c6 [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient 7c9e05ea8c6 is described below commit 7c9e05ea8c67b12c657b60cd5e6d1bea52b4f9a3 Author: Lijie Wang AuthorDate: Wed Sep 20 12:04:25 2023 +0800 [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient This closes #23363 --- .../client/program/rest/RestClusterClient.java | 35 -- .../client/program/rest/RestClusterClientTest.java | 31 ++- 2 files changed, 22 insertions(+), 44 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 2ebcc14d5fe..a1ef5f7e0a9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -158,7 +158,6 @@ public class RestClusterClient implements ClusterClient { private static final Logger LOG = LoggerFactory.getLogger(RestClusterClient.class); private final RestClusterClientConfiguration restClusterClientConfiguration; -private final java.nio.file.Path tempDir; private final Configuration configuration; @@ -195,13 +194,7 @@ public class RestClusterClient implements ClusterClient { public RestClusterClient( Configuration config, T clusterId, ClientHighAvailabilityServicesFactory factory) throws Exception { -this( -config, -null, -clusterId, -new ExponentialWaitStrategy(10L, 2000L), -factory, -Files.createTempDirectory("flink-rest-client-jobgraphs")); +this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory); } @VisibleForTesting @@ -216,24 +209,7 @@ public class RestClusterClient implements ClusterClient { restClient, clusterId, waitStrategy, -Files.createTempDirectory("flink-rest-client-jobgraphs")); -} - -@VisibleForTesting -RestClusterClient( -Configuration configuration, -@Nullable RestClient restClient, -T clusterId, -WaitStrategy waitStrategy, -java.nio.file.Path tmpDir) -throws Exception { -this( -configuration, -restClient, -clusterId, -waitStrategy, -DefaultClientHighAvailabilityServicesFactory.INSTANCE, -tmpDir); +DefaultClientHighAvailabilityServicesFactory.INSTANCE); } private RestClusterClient( @@ -241,14 +217,12 @@ public class RestClusterClient implements ClusterClient { @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, -ClientHighAvailabilityServicesFactory clientHAServicesFactory, -java.nio.file.Path tempDir) +ClientHighAvailabilityServicesFactory clientHAServicesFactory) throws Exception { this.configuration = checkNotNull(configuration); this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); -this.tempDir = tempDir; if (restClient != null) { this.restClient = restClient; @@ -354,7 +328,8 @@ public class RestClusterClient implements ClusterClient { () -> { try { final java.nio.file.Path jobGraphFile = -Files.createTempFile(tempDir, "flink-jobgraph", ".bin"); +Files.createTempFile( +"flink-jobgraph-" + jobGraph.getJobID(), ".bin"); try (ObjectOutputStream objectOut = new ObjectOutputStream( Files.newOutputStream(jobGraphFile))) { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 2ff6dea2a98..7b44c6f469f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@
[flink] branch release-1.18 updated: [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 2aeb99804ba [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient 2aeb99804ba is described below commit 2aeb99804ba56c008df0a1730f3246d3fea856b9 Author: Lijie Wang AuthorDate: Wed Sep 20 12:04:25 2023 +0800 [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient This closes #23363 --- .../client/program/rest/RestClusterClient.java | 35 -- .../client/program/rest/RestClusterClientTest.java | 31 ++- 2 files changed, 22 insertions(+), 44 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 66b918180a6..4318d0a6c42 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -163,7 +163,6 @@ public class RestClusterClient implements ClusterClient { private static final Logger LOG = LoggerFactory.getLogger(RestClusterClient.class); private final RestClusterClientConfiguration restClusterClientConfiguration; -private final java.nio.file.Path tempDir; private final Configuration configuration; @@ -200,13 +199,7 @@ public class RestClusterClient implements ClusterClient { public RestClusterClient( Configuration config, T clusterId, ClientHighAvailabilityServicesFactory factory) throws Exception { -this( -config, -null, -clusterId, -new ExponentialWaitStrategy(10L, 2000L), -factory, -Files.createTempDirectory("flink-rest-client-jobgraphs")); +this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory); } @VisibleForTesting @@ -221,24 +214,7 @@ public class RestClusterClient implements ClusterClient { restClient, clusterId, waitStrategy, -Files.createTempDirectory("flink-rest-client-jobgraphs")); -} - -@VisibleForTesting -RestClusterClient( -Configuration configuration, -@Nullable RestClient restClient, -T clusterId, -WaitStrategy waitStrategy, -java.nio.file.Path tmpDir) -throws Exception { -this( -configuration, -restClient, -clusterId, -waitStrategy, -DefaultClientHighAvailabilityServicesFactory.INSTANCE, -tmpDir); +DefaultClientHighAvailabilityServicesFactory.INSTANCE); } private RestClusterClient( @@ -246,14 +222,12 @@ public class RestClusterClient implements ClusterClient { @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, -ClientHighAvailabilityServicesFactory clientHAServicesFactory, -java.nio.file.Path tempDir) +ClientHighAvailabilityServicesFactory clientHAServicesFactory) throws Exception { this.configuration = checkNotNull(configuration); this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); -this.tempDir = tempDir; if (restClient != null) { this.restClient = restClient; @@ -359,7 +333,8 @@ public class RestClusterClient implements ClusterClient { () -> { try { final java.nio.file.Path jobGraphFile = -Files.createTempFile(tempDir, "flink-jobgraph", ".bin"); +Files.createTempFile( +"flink-jobgraph-" + jobGraph.getJobID(), ".bin"); try (ObjectOutputStream objectOut = new ObjectOutputStream( Files.newOutputStream(jobGraphFile))) { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 740eb06a57b..e44174beb78 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@
[flink] 02/02: [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7997cf152cc59b45c51977b825c379c0421b575e Author: Yuxin Tan AuthorDate: Fri Sep 15 10:32:34 2023 +0800 [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage --- .../tiered/common/TieredStorageConfiguration.java | 10 ++ .../hybrid/tiered/tier/disk/DiskCacheManager.java | 26 +++ .../hybrid/tiered/tier/disk/DiskTierFactory.java | 5 +++ .../tiered/tier/disk/DiskTierProducerAgent.java| 2 ++ .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++ .../tier/disk/DiskTierProducerAgentTest.java | 1 + 6 files changed, 75 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java index d031d7278c4..abc6c955d53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java @@ -67,6 +67,8 @@ public class TieredStorageConfiguration { private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 * 1024L; +private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 1024; + private final String remoteStorageBasePath; private final int tieredStorageBufferSize; @@ -330,6 +332,8 @@ public class TieredStorageConfiguration { private long numRetainedInMemoryRegionsMax = DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY; +private int maxCachedBytesBeforeFlush = DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH; + private List tierFactories; private List tierExclusiveBuffers; @@ -416,6 +420,11 @@ public class TieredStorageConfiguration { return this; } +public Builder setMaxCachedBytesBeforeFlush(int maxCachedBytesBeforeFlush) { +this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; +return this; +} + public TieredStorageConfiguration build() { setupTierFactoriesAndExclusiveBuffers(); return new TieredStorageConfiguration( @@ -451,6 +460,7 @@ public class TieredStorageConfiguration { tieredStorageBufferSize, minReserveDiskSpaceFraction, regionGroupSizeInBytes, +maxCachedBytesBeforeFlush, numRetainedInMemoryRegionsMax)); tierExclusiveBuffers.add(diskTierExclusiveBuffers); if (remoteStorageBasePath != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java index f5d7f30e0c0..feb3a1eb1d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java @@ -41,6 +41,8 @@ class DiskCacheManager { private final int numSubpartitions; +private final int maxCachedBytesBeforeFlush; + private final PartitionFileWriter partitionFileWriter; private final SubpartitionDiskCacheManager[] subpartitionCacheManagers; @@ -48,13 +50,21 @@ class DiskCacheManager { /** Whether the current flush process has completed. */ private CompletableFuture hasFlushCompleted; +/** + * The number of all subpartition's cached bytes in the cache manager. Note that the counter can + * only be accessed by the task thread and does not require locks. + */ +private int numCachedBytesCounter; + DiskCacheManager( TieredStoragePartitionId partitionId, int numSubpartitions, +int maxCachedBytesBeforeFlush, TieredStorageMemoryManager memoryManager, PartitionFileWriter partitionFileWriter) { this.partitionId = partitionId; this.numSubpartitions = numSubpartitions; +this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; this.partitionFileWriter = partitionFileWriter; this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[numSubpartitions]; this.hasFlushCompleted = FutureUtils.completedVoidFuture(); @@ -81,6 +91,7 @@ class DiskCacheManager { */ void append(Buffer buffer, int subpartitionId) {
[flink] 01/02: [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 59368aedad90cda46883ed0bd0b8fa44aa75c0fe Author: Yuxin Tan AuthorDate: Fri Sep 15 10:31:51 2023 +0800 [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage --- .../hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java index e610926a55d..4a5e9af3ea0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java @@ -75,6 +75,7 @@ public class HashSubpartitionBufferAccumulator { } public void close() { +finishCurrentWritingBufferIfNotEmpty(); while (!unfinishedBuffers.isEmpty()) { unfinishedBuffers.poll().close(); }
[flink] branch release-1.18 updated (9c1318ca7fa -> 7997cf152cc)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git from 9c1318ca7fa [FLINK-15736][docs] Add Java compatibility page new 59368aedad9 [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage new 7997cf152cc [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../tiered/common/TieredStorageConfiguration.java | 10 ++ .../storage/HashSubpartitionBufferAccumulator.java | 1 + .../hybrid/tiered/tier/disk/DiskCacheManager.java | 26 +++ .../hybrid/tiered/tier/disk/DiskTierFactory.java | 5 +++ .../tiered/tier/disk/DiskTierProducerAgent.java| 2 ++ .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++ .../tier/disk/DiskTierProducerAgentTest.java | 1 + 7 files changed, 76 insertions(+), 6 deletions(-)
[flink] 02/02: [FLINK-33050][docs] Add notice of data duplicates for RTAS in docs This closes #23389
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d5e3edb98f1f485fdafd8ce1fd88a930af7cf829 Author: Tartarus0zm AuthorDate: Wed Sep 20 11:55:54 2023 +0800 [FLINK-33050][docs] Add notice of data duplicates for RTAS in docs This closes #23389 --- docs/content.zh/docs/dev/table/sql/create.md | 1 + docs/content/docs/dev/table/sql/create.md| 1 + 2 files changed, 2 insertions(+) diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md index 7024b457585..f6c8674b6c8 100644 --- a/docs/content.zh/docs/dev/table/sql/create.md +++ b/docs/content.zh/docs/dev/table/sql/create.md @@ -615,6 +615,7 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM source_table WHERE mod(id, 1 * 暂不支持主键约束。 **注意:** 默认情况下,RTAS 是非原子性的,这意味着如果在向表中插入数据时发生错误,该表不会被自动删除或还原成原来的表。 +**注意:** RTAS 会先删除表,然后创建表并写入数据。但如果表是在基于内存的 Catalog 里,删除表只会将其从 Catalog 里移除,并不会移除物理表中的数据。因此,执行RTAS语句之前的数据仍然存在。 ### 原子性 diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md index 8dc17cd525d..3d94b111c53 100644 --- a/docs/content/docs/dev/table/sql/create.md +++ b/docs/content/docs/dev/table/sql/create.md @@ -616,6 +616,7 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM source_table WHERE mod(id, 1 * Does not support specifying primary key constraints yet. **Note:** By default, RTAS is non-atomic which means the table won't be dropped or restored to its origin automatically if occur errors while inserting data into the table. +**Note:** RTAS will drop the table first, then create the table and insert the data. But if the table is in the in-memory catalog, dropping table will only remove it from the catalog without removing the data in the physical table. So, the data before executing RTAS statement will still exist. ### Atomicity
[flink] 01/02: [FLINK-33050][table] Atomicity is not supported prompting the user to disable
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0e97ae3549f77578b36ee9d6da97d98fc6e76fa4 Author: Tartarus0zm AuthorDate: Wed Sep 20 11:55:39 2023 +0800 [FLINK-33050][table] Atomicity is not supported prompting the user to disable --- .../table/api/internal/TableEnvironmentImpl.java | 36 ++ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1b9648f6cb6..d25df607fd1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -143,6 +143,7 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN */ @Internal public class TableEnvironmentImpl implements TableEnvironmentInternal { + // Flag that tells if the TableSource/TableSink used in this environment is stream table // source/sink, // and this should always be true. This avoids too many hard code. @@ -951,18 +952,29 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { createTableOperation.getTableIdentifier(), catalogTable, createTableOperation.isTemporary())) { -DynamicTableSink dynamicTableSink = -ExecutableOperationUtils.createDynamicTableSink( -catalog, -() -> moduleManager.getFactory((Module::getTableSinkFactory)), -createTableOperation.getTableIdentifier(), -catalogTable, -Collections.emptyMap(), -tableConfig, -resourceManager.getUserClassLoader(), -createTableOperation.isTemporary()); -if (dynamicTableSink instanceof SupportsStaging) { -return Optional.of(dynamicTableSink); +try { +DynamicTableSink dynamicTableSink = +ExecutableOperationUtils.createDynamicTableSink( +catalog, +() -> moduleManager.getFactory((Module::getTableSinkFactory)), +createTableOperation.getTableIdentifier(), +catalogTable, +Collections.emptyMap(), +tableConfig, +resourceManager.getUserClassLoader(), +createTableOperation.isTemporary()); +if (dynamicTableSink instanceof SupportsStaging) { +return Optional.of(dynamicTableSink); +} +} catch (Exception e) { +throw new TableException( +String.format( +"Fail to create DynamicTableSink for the table %s, " ++ "maybe the table does not support atomicity of CTAS/RTAS, " ++ "please set %s to false and try again.", +createTableOperation.getTableIdentifier(), + TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key()), +e); } } }
[flink] branch master updated (f2cb1d24728 -> d5e3edb98f1)
This is an automated email from the ASF dual-hosted git repository. yuxia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f2cb1d24728 [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to JUnit5 (#23301) new 0e97ae3549f [FLINK-33050][table] Atomicity is not supported prompting the user to disable new d5e3edb98f1 [FLINK-33050][docs] Add notice of data duplicates for RTAS in docs This closes #23389 The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/dev/table/sql/create.md | 1 + docs/content/docs/dev/table/sql/create.md | 1 + .../table/api/internal/TableEnvironmentImpl.java | 36 ++ 3 files changed, 26 insertions(+), 12 deletions(-)
[flink] branch master updated: [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to JUnit5 (#23301)
This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f2cb1d24728 [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to JUnit5 (#23301) f2cb1d24728 is described below commit f2cb1d247283344e9194e63931a2948e09f73c93 Author: Zhanghao Chen AuthorDate: Thu Sep 21 08:45:59 2023 +0800 [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to JUnit5 (#23301) * [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to JUnit5 --- .../apache/flink/runtime/rpc/AsyncCallsTest.java | 44 .../flink/runtime/rpc/FencedRpcEndpointTest.java | 49 - .../flink/runtime/rpc/RpcConnectionTest.java | 7 +- .../apache/flink/runtime/rpc/RpcEndpointTest.java | 112 + .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java | 29 +++--- 5 files changed, 109 insertions(+), 132 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 0ffad0f098e..4be039e1340 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -20,11 +20,10 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.UUID; @@ -35,11 +34,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; -public class AsyncCallsTest extends TestLogger { +class AsyncCallsTest { // // shared test members @@ -49,14 +46,13 @@ public class AsyncCallsTest extends TestLogger { private static RpcService rpcService; -@BeforeClass -public static void setup() throws Exception { +@BeforeAll +static void setup() throws Exception { rpcService = RpcSystem.load().localServiceBuilder(new Configuration()).createAndStart(); } -@AfterClass -public static void shutdown() -throws InterruptedException, ExecutionException, TimeoutException { +@AfterAll +static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { rpcService.closeAsync().get(); } @@ -65,12 +61,12 @@ public class AsyncCallsTest extends TestLogger { // @Test -public void testScheduleWithNoDelay() throws Exception { +void testScheduleWithNoDelay() throws Exception { runScheduleWithNoDelayTest(TestEndpoint::new); } @Test -public void testFencedScheduleWithNoDelay() throws Exception { +void testFencedScheduleWithNoDelay() throws Exception { runScheduleWithNoDelayTest(FencedTestEndpoint::new); } @@ -117,22 +113,24 @@ public class AsyncCallsTest extends TestLogger { Duration.ofSeconds(30L)); String str = result.get(30, TimeUnit.SECONDS); -assertEquals("test", str); +assertThat(str).isEqualTo("test"); // validate that no concurrent access happened -assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get()); +assertThat(concurrentAccess) +.withFailMessage("Rpc Endpoint had concurrent access") +.isFalse(); } finally { RpcUtils.terminateRpcEndpoint(rpcEndpoint); } } @Test -public void testScheduleWithDelay() throws Exception { +void testScheduleWithDelay() throws Exception { runScheduleWithDelayTest(TestEndpoint::new); } @Test -public void testFencedScheduleWithDelay() throws Exception { +void testFencedScheduleWithDelay() throws Exception { runScheduleWithDelayTest(FencedTestEndpoint::new); } @@ -178,9 +176,13 @@ public class AsyncCallsTest extends TestLogger { final long stop = System.nanoTime(); // validate that no concurrent access happened -assertFalse("Rpc Endpoint had concurrent
[flink] branch release-1.18 updated: [FLINK-15736][docs] Add Java compatibility page
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 9c1318ca7fa [FLINK-15736][docs] Add Java compatibility page 9c1318ca7fa is described below commit 9c1318ca7fa5b2e7b11827068ad1288483aaa464 Author: Chesnay Schepler AuthorDate: Wed Sep 20 22:07:04 2023 +0200 [FLINK-15736][docs] Add Java compatibility page --- .../docs/deployment/java_compatibility.md | 77 ++ docs/content/docs/deployment/java_compatibility.md | 77 ++ docs/content/docs/deployment/memory/_index.md | 2 +- 3 files changed, 155 insertions(+), 1 deletion(-) diff --git a/docs/content.zh/docs/deployment/java_compatibility.md b/docs/content.zh/docs/deployment/java_compatibility.md new file mode 100644 index 000..f2e4cb93eb4 --- /dev/null +++ b/docs/content.zh/docs/deployment/java_compatibility.md @@ -0,0 +1,77 @@ +--- +title: Java Compatibility +weight: 2 +type: docs +--- + + +# Java compatibility + +This page lists which Java versions Flink supports and what limitations apply (if any). + +## Java 8 (deprecated) + +Support for Java 8 has been deprecated in 1.15.0. +It is recommended to migrate to Java 11. + +## Java 11 + +Support for Java 11 was added in 1.10.0 and is the recommended Java version to run Flink on. + +This is the default version for docker images. + +### Untested Flink features + +The following Flink features have not been tested with Java 11: + +* Hive connector +* Hbase 1.x connector + +### Untested language features + +* Modularized user jars have not been tested. + +## Java 17 + +Experimental support for Java 17 was added in 1.18.0. ([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736)) + +### Untested Flink features + +These Flink features have not been tested with Java 17: + +* Hive connector +* Hbase 1.x connector + +### JDK modularization + +Starting with Java 16 Java applications have to fully cooperate with the JDK modularization, also known as [Project Jigsaw](https://openjdk.org/projects/jigsaw/). +This means that access to JDK classes/internal must be explicitly allowed by the application when it is started, on a per-module basis, in the form of --add-opens/--add-exports JVM arguments. + +Since Flink uses reflection for serializing user-defined functions and data (via Kryo), this means that if your UDFs or data types use JDK classes you may have to allow access to these JDK classes. + +These should be configured via the [env.java.opts.all]({{< ref "docs/deployment/config" >}}#env-java-opts-all) option. + +In the default configuration in the Flink distribution this option is configured such that Flink itself works on Java 17. +The list of configured arguments must not be shortened, but only extended. + +### Known issues + +* Java records are not supported. See [FLINK-32380](https://issues.apache.org/jira/browse/FLINK-32380) for updates. +* SIGSEGV in C2 Compiler thread: Early Java 17 builds are affected by a bug where the JVM can fail abruptly. Update your Java 17 installation to resolve the issue. See [JDK-8277529](https://bugs.openjdk.org/browse/JDK-8277529) for details. diff --git a/docs/content/docs/deployment/java_compatibility.md b/docs/content/docs/deployment/java_compatibility.md new file mode 100644 index 000..f2e4cb93eb4 --- /dev/null +++ b/docs/content/docs/deployment/java_compatibility.md @@ -0,0 +1,77 @@ +--- +title: Java Compatibility +weight: 2 +type: docs +--- + + +# Java compatibility + +This page lists which Java versions Flink supports and what limitations apply (if any). + +## Java 8 (deprecated) + +Support for Java 8 has been deprecated in 1.15.0. +It is recommended to migrate to Java 11. + +## Java 11 + +Support for Java 11 was added in 1.10.0 and is the recommended Java version to run Flink on. + +This is the default version for docker images. + +### Untested Flink features + +The following Flink features have not been tested with Java 11: + +* Hive connector +* Hbase 1.x connector + +### Untested language features + +* Modularized user jars have not been tested. + +## Java 17 + +Experimental support for Java 17 was added in 1.18.0. ([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736)) + +### Untested Flink features + +These Flink features have not been tested with Java 17: + +* Hive connector +* Hbase 1.x connector + +### JDK modularization + +Starting with Java 16 Java applications have to fully cooperate with the JDK modularization, also known as [Project Jigsaw](https://openjdk.org/projects/jigsaw/). +This means that access to JDK classes/internal must be explicitly allowed by the application when it is started, on a per-module basis, in the form of --add-opens/--add-exports JVM arguments. + +Since Flink uses reflection for serializing user-defined
[flink] branch master updated: [FLINK-15736][docs] Add Java compatibility page
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 388601f1b75 [FLINK-15736][docs] Add Java compatibility page 388601f1b75 is described below commit 388601f1b75abd443e149aaa4584d366072a1b0e Author: Chesnay Schepler AuthorDate: Wed Sep 20 22:07:04 2023 +0200 [FLINK-15736][docs] Add Java compatibility page --- .../docs/deployment/java_compatibility.md | 77 ++ docs/content/docs/deployment/java_compatibility.md | 77 ++ docs/content/docs/deployment/memory/_index.md | 2 +- 3 files changed, 155 insertions(+), 1 deletion(-) diff --git a/docs/content.zh/docs/deployment/java_compatibility.md b/docs/content.zh/docs/deployment/java_compatibility.md new file mode 100644 index 000..f2e4cb93eb4 --- /dev/null +++ b/docs/content.zh/docs/deployment/java_compatibility.md @@ -0,0 +1,77 @@ +--- +title: Java Compatibility +weight: 2 +type: docs +--- + + +# Java compatibility + +This page lists which Java versions Flink supports and what limitations apply (if any). + +## Java 8 (deprecated) + +Support for Java 8 has been deprecated in 1.15.0. +It is recommended to migrate to Java 11. + +## Java 11 + +Support for Java 11 was added in 1.10.0 and is the recommended Java version to run Flink on. + +This is the default version for docker images. + +### Untested Flink features + +The following Flink features have not been tested with Java 11: + +* Hive connector +* Hbase 1.x connector + +### Untested language features + +* Modularized user jars have not been tested. + +## Java 17 + +Experimental support for Java 17 was added in 1.18.0. ([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736)) + +### Untested Flink features + +These Flink features have not been tested with Java 17: + +* Hive connector +* Hbase 1.x connector + +### JDK modularization + +Starting with Java 16 Java applications have to fully cooperate with the JDK modularization, also known as [Project Jigsaw](https://openjdk.org/projects/jigsaw/). +This means that access to JDK classes/internal must be explicitly allowed by the application when it is started, on a per-module basis, in the form of --add-opens/--add-exports JVM arguments. + +Since Flink uses reflection for serializing user-defined functions and data (via Kryo), this means that if your UDFs or data types use JDK classes you may have to allow access to these JDK classes. + +These should be configured via the [env.java.opts.all]({{< ref "docs/deployment/config" >}}#env-java-opts-all) option. + +In the default configuration in the Flink distribution this option is configured such that Flink itself works on Java 17. +The list of configured arguments must not be shortened, but only extended. + +### Known issues + +* Java records are not supported. See [FLINK-32380](https://issues.apache.org/jira/browse/FLINK-32380) for updates. +* SIGSEGV in C2 Compiler thread: Early Java 17 builds are affected by a bug where the JVM can fail abruptly. Update your Java 17 installation to resolve the issue. See [JDK-8277529](https://bugs.openjdk.org/browse/JDK-8277529) for details. diff --git a/docs/content/docs/deployment/java_compatibility.md b/docs/content/docs/deployment/java_compatibility.md new file mode 100644 index 000..f2e4cb93eb4 --- /dev/null +++ b/docs/content/docs/deployment/java_compatibility.md @@ -0,0 +1,77 @@ +--- +title: Java Compatibility +weight: 2 +type: docs +--- + + +# Java compatibility + +This page lists which Java versions Flink supports and what limitations apply (if any). + +## Java 8 (deprecated) + +Support for Java 8 has been deprecated in 1.15.0. +It is recommended to migrate to Java 11. + +## Java 11 + +Support for Java 11 was added in 1.10.0 and is the recommended Java version to run Flink on. + +This is the default version for docker images. + +### Untested Flink features + +The following Flink features have not been tested with Java 11: + +* Hive connector +* Hbase 1.x connector + +### Untested language features + +* Modularized user jars have not been tested. + +## Java 17 + +Experimental support for Java 17 was added in 1.18.0. ([FLINK-15736](https://issues.apache.org/jira/browse/FLINK-15736)) + +### Untested Flink features + +These Flink features have not been tested with Java 17: + +* Hive connector +* Hbase 1.x connector + +### JDK modularization + +Starting with Java 16 Java applications have to fully cooperate with the JDK modularization, also known as [Project Jigsaw](https://openjdk.org/projects/jigsaw/). +This means that access to JDK classes/internal must be explicitly allowed by the application when it is started, on a per-module basis, in the form of --add-opens/--add-exports JVM arguments. + +Since Flink uses reflection for serializing user-defined functions and data
[flink] branch master updated (5e3abe28f62 -> 5a8321f6385)
This is an automated email from the ASF dual-hosted git repository. huweihua pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 5e3abe28f62 [FLINK-33024][table-planner][JUnit5 Migration] Module: flink-table-planner (JsonPlanTestBase) (#23353) add db53ca19361 [hotfix][docs] Update the parameter type of JobVertex#finalizeOnMaster in JavaDocs add 5a8321f6385 [FLINK-32846][runtime][JUnit5 Migration] The metrics package of flink-runtime module No new revisions were added by this update. Summary of changes: .../executiongraph/ExecutionGraphTestUtils.java| 2 +- .../executiongraph/FinalizeOnMasterTest.java | 4 +- .../DescriptiveStatisticsHistogramTest.java| 7 +- .../runtime/metrics/MetricRegistryImplTest.java| 3 - .../flink/runtime/metrics/ReporterSetupTest.java | 89 - .../flink/runtime/metrics/ThresholdMeterTest.java | 60 +++--- .../flink/runtime/metrics/TimerGaugeTest.java | 65 +++ .../metrics/dump/MetricDumpSerializerTest.java | 2 +- .../flink/runtime/metrics/dump/MetricDumpTest.java | 67 +++ .../metrics/dump/MetricQueryServiceTest.java | 78 .../metrics/filter/DefaultMetricFilterTest.java| 68 +++ .../metrics/groups/AbstractMetricGroupTest.java| 132 ++--- .../metrics/groups/FrontMetricGroupTest.java | 38 ++-- .../metrics/groups/InternalOperatorGroupTest.java | 88 - .../metrics/groups/JobManagerGroupTest.java| 43 ++--- .../metrics/groups/JobManagerJobGroupTest.java | 41 ++-- .../groups/JobManagerOperatorGroupTest.java| 6 +- .../groups/MetricGroupRegistrationTest.java| 26 ++- .../runtime/metrics/groups/MetricGroupTest.java| 213 - .../metrics/groups/TaskIOMetricGroupTest.java | 2 +- .../metrics/groups/TaskManagerGroupTest.java | 80 .../metrics/groups/TaskManagerJobGroupTest.java| 57 +++--- .../metrics/groups/TaskManagerMetricGroupTest.java | 36 ++-- .../metrics/groups/TaskMetricGroupTest.java| 105 +- .../runtime/metrics/util/MetricUtilsTest.java | 80 .../metrics/utils/SystemResourcesCounterTest.java | 23 +-- 26 files changed, 680 insertions(+), 735 deletions(-)
[flink] 01/02: [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 887c66a93cf3db5c6be0461f4ce4b1be26b6c5c6 Author: Yuxin Tan AuthorDate: Fri Sep 15 10:31:51 2023 +0800 [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage --- .../hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java index e610926a55d..4a5e9af3ea0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/HashSubpartitionBufferAccumulator.java @@ -75,6 +75,7 @@ public class HashSubpartitionBufferAccumulator { } public void close() { +finishCurrentWritingBufferIfNotEmpty(); while (!unfinishedBuffers.isEmpty()) { unfinishedBuffers.poll().close(); }
[flink] branch master updated (5a8321f6385 -> 6034d5ff335)
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 5a8321f6385 [FLINK-32846][runtime][JUnit5 Migration] The metrics package of flink-runtime module add 6034d5ff335 [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient No new revisions were added by this update. Summary of changes: .../client/program/rest/RestClusterClient.java | 35 -- .../client/program/rest/RestClusterClientTest.java | 31 ++- 2 files changed, 22 insertions(+), 44 deletions(-)
[flink] 02/02: [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b076c52d8da914e81c3e004c0b0c7883463bb151 Author: Yuxin Tan AuthorDate: Fri Sep 15 10:32:34 2023 +0800 [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage --- .../tiered/common/TieredStorageConfiguration.java | 10 ++ .../hybrid/tiered/tier/disk/DiskCacheManager.java | 26 +++ .../hybrid/tiered/tier/disk/DiskTierFactory.java | 5 +++ .../tiered/tier/disk/DiskTierProducerAgent.java| 2 ++ .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++ .../tier/disk/DiskTierProducerAgentTest.java | 1 + 6 files changed, 75 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java index d031d7278c4..abc6c955d53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java @@ -67,6 +67,8 @@ public class TieredStorageConfiguration { private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 * 1024L; +private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 1024; + private final String remoteStorageBasePath; private final int tieredStorageBufferSize; @@ -330,6 +332,8 @@ public class TieredStorageConfiguration { private long numRetainedInMemoryRegionsMax = DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY; +private int maxCachedBytesBeforeFlush = DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH; + private List tierFactories; private List tierExclusiveBuffers; @@ -416,6 +420,11 @@ public class TieredStorageConfiguration { return this; } +public Builder setMaxCachedBytesBeforeFlush(int maxCachedBytesBeforeFlush) { +this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; +return this; +} + public TieredStorageConfiguration build() { setupTierFactoriesAndExclusiveBuffers(); return new TieredStorageConfiguration( @@ -451,6 +460,7 @@ public class TieredStorageConfiguration { tieredStorageBufferSize, minReserveDiskSpaceFraction, regionGroupSizeInBytes, +maxCachedBytesBeforeFlush, numRetainedInMemoryRegionsMax)); tierExclusiveBuffers.add(diskTierExclusiveBuffers); if (remoteStorageBasePath != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java index f5d7f30e0c0..feb3a1eb1d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java @@ -41,6 +41,8 @@ class DiskCacheManager { private final int numSubpartitions; +private final int maxCachedBytesBeforeFlush; + private final PartitionFileWriter partitionFileWriter; private final SubpartitionDiskCacheManager[] subpartitionCacheManagers; @@ -48,13 +50,21 @@ class DiskCacheManager { /** Whether the current flush process has completed. */ private CompletableFuture hasFlushCompleted; +/** + * The number of all subpartition's cached bytes in the cache manager. Note that the counter can + * only be accessed by the task thread and does not require locks. + */ +private int numCachedBytesCounter; + DiskCacheManager( TieredStoragePartitionId partitionId, int numSubpartitions, +int maxCachedBytesBeforeFlush, TieredStorageMemoryManager memoryManager, PartitionFileWriter partitionFileWriter) { this.partitionId = partitionId; this.numSubpartitions = numSubpartitions; +this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; this.partitionFileWriter = partitionFileWriter; this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[numSubpartitions]; this.hasFlushCompleted = FutureUtils.completedVoidFuture(); @@ -81,6 +91,7 @@ class DiskCacheManager { */ void append(Buffer buffer, int subpartitionId) {
[flink] branch master updated (6034d5ff335 -> b076c52d8da)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 6034d5ff335 [FLINK-32974][client] Avoid creating a new temporary directory every time for RestClusterClient new 887c66a93cf [hotfix][network] Flush writing buffers when closing HashSubpartitionBufferAccumulator of tiered storage new b076c52d8da [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../tiered/common/TieredStorageConfiguration.java | 10 ++ .../storage/HashSubpartitionBufferAccumulator.java | 1 + .../hybrid/tiered/tier/disk/DiskCacheManager.java | 26 +++ .../hybrid/tiered/tier/disk/DiskTierFactory.java | 5 +++ .../tiered/tier/disk/DiskTierProducerAgent.java| 2 ++ .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++ .../tier/disk/DiskTierProducerAgentTest.java | 1 + 7 files changed, 76 insertions(+), 6 deletions(-)