[flink] branch master updated (2ed858318f3 -> 50952050057)
This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 2ed858318f3 [FLINK-32345][state] Improve parallel download of RocksDB incremental state. (#22788) add 50952050057 [FLINK-32205][rest] Add the option "rest.url-prefix" to support connecting to the server through a proxy. No new revisions were added by this update. Summary of changes: docs/layouts/shortcodes/generated/rest_configuration.html | 6 ++ .../java/org/apache/flink/configuration/RestOptions.java | 11 +++ .../java/org/apache/flink/runtime/rest/RestClient.java | 14 +++--- 3 files changed, 28 insertions(+), 3 deletions(-)
[flink] branch master updated: [FLINK-32345][state] Improve parallel download of RocksDB incremental state. (#22788)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2ed858318f3 [FLINK-32345][state] Improve parallel download of RocksDB incremental state. (#22788) 2ed858318f3 is described below commit 2ed858318f38eb9913e5f4b3019be6b6d0a8e6fb Author: Stefan Richter AuthorDate: Wed Jun 21 12:17:08 2023 +0200 [FLINK-32345][state] Improve parallel download of RocksDB incremental state. (#22788) * [FLINK-32345] Improve parallel download of RocksDB incremental state. This commit improves RocksDBStateDownloader to support parallelized state download across multiple state types and across multiple state handles. This can improve our download times for scale-in. --- .../java/org/apache/flink/util/CollectionUtil.java | 81 +++ .../org/apache/flink/util/CollectionUtilTest.java | 47 ++ .../state/RocksDBIncrementalCheckpointUtils.java | 2 +- .../streaming/state/RocksDBStateDownloader.java| 121 +--- .../streaming/state/StateHandleDownloadSpec.java | 49 +++ .../RocksDBIncrementalRestoreOperation.java| 157 - .../state/RocksDBStateDownloaderTest.java | 132 + 7 files changed, 438 insertions(+), 151 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java index 8c96e3e3554..18f4c4313c4 100644 --- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java @@ -19,6 +19,7 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import javax.annotation.Nullable; @@ -27,7 +28,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -45,6 +49,9 @@ public final class CollectionUtil { /** A safe maximum size for arrays in the JVM. */ public static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; +/** The default load factor for hash maps create with this util class. */ +static final float HASH_MAP_DEFAULT_LOAD_FACTOR = 0.75f; + private CollectionUtil() { throw new AssertionError(); } @@ -133,4 +140,78 @@ public final class CollectionUtil { } return Collections.unmodifiableMap(map); } + +/** + * Creates a new {@link HashMap} of the expected size, i.e. a hash map that will not rehash if + * expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static HashMap newHashMapWithExpectedSize(int expectedSize) { +return new HashMap<>( +computeRequiredCapacity(expectedSize, HASH_MAP_DEFAULT_LOAD_FACTOR), +HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash map that will not + * rehash if expectedSize many keys are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of keys maintained by this map. + * @param the type of mapped values. + */ +public static LinkedHashMap newLinkedHashMapWithExpectedSize(int expectedSize) { +return new LinkedHashMap<>( +computeRequiredCapacity(expectedSize, HASH_MAP_DEFAULT_LOAD_FACTOR), +HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link HashSet} of the expected size, i.e. a hash set that will not rehash if + * expectedSize many unique elements are inserted, considering the load factor. + * + * @param expectedSize the expected size of the created hash map. + * @return a new hash map instance with enough capacity for the expected size. + * @param the type of elements stored by this set. + */ +public static HashSet newHashSetWithExpectedSize(int expectedSize) { +return new HashSet<>( +computeRequiredCapacity(expectedSize, HASH_MAP_DEFAULT_LOAD_FACTOR), +HASH_MAP_DEFAULT_LOAD_FACTOR); +} + +/** + * Creates a new {@link LinkedHashSet} of the expected size,
[flink] 01/03: [hotfix][runtime] Adds helper method that can specify a parent path that is used for the ZooKeeperRetrievalDriver
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ef9f43847e7f8ee3e6d2d8948d79cf8d95264fe3 Author: Matthias Pohl AuthorDate: Thu May 25 16:11:02 2023 +0200 [hotfix][runtime] Adds helper method that can specify a parent path that is used for the ZooKeeperRetrievalDriver Signed-off-by: Matthias Pohl --- .../java/org/apache/flink/runtime/util/ZooKeeperUtils.java | 14 +- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index bfbb10e59e8..8312b5136d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -366,7 +366,19 @@ public class ZooKeeperUtils { */ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory( final CuratorFramework client) { -return createLeaderRetrievalDriverFactory(client, "", new Configuration()); +return createLeaderRetrievalDriverFactory(client, ""); +} + +/** + * Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper. + * + * @param client The {@link CuratorFramework} ZooKeeper client to use + * @param path The parent path that shall be used by the client. + * @return {@link LeaderRetrievalDriverFactory} instance. + */ +public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory( +final CuratorFramework client, String path) { +return createLeaderRetrievalDriverFactory(client, path, new Configuration()); } /**
[flink] 02/03: [hotfix][runtime] Improves initial logging for ZK leader retrieval and election
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit effcec820702dacf79ce97fa8586ba6757e5b862 Author: Matthias Pohl AuthorDate: Thu May 25 16:12:06 2023 +0200 [hotfix][runtime] Improves initial logging for ZK leader retrieval and election Signed-off-by: Matthias Pohl --- .../ZooKeeperMultipleComponentLeaderElectionDriver.java | 11 +++ .../leaderretrieval/ZooKeeperLeaderRetrievalDriver.java | 5 + 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java index c028f29ec95..e6cc17719b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java @@ -138,10 +138,6 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) { Preconditions.checkState(running.get()); -if (LOG.isDebugEnabled()) { -LOG.debug("Write leader information {} for {}.", leaderInformation, componentId); -} - if (!leaderLatch.hasLeadership()) { return; } @@ -149,6 +145,13 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver final String connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(componentId); +LOG.debug( +"Write leader information {} for {} to {}.", +leaderInformation, +componentId, +ZooKeeperUtils.generateZookeeperPath( +curatorFramework.getNamespace(), connectionInformationPath)); + try { ZooKeeperUtils.writeLeaderInformationToZooKeeper( leaderInformation, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java index 47182063210..ec1908e3185 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java @@ -101,6 +101,11 @@ public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver { client.getConnectionStateListenable().addListener(connectionStateListener); +LOG.debug( +"Monitoring data change in {}", +ZooKeeperUtils.generateZookeeperPath( +client.getNamespace(), connectionInformationPath)); + running = true; }
[flink] 03/03: [FLINK-30342][test] Migrates ZooKeeperLeaderElectionTest to MultipleComponentLeaderElectionDriver
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c4c633e843920bbf18b2ac2242ee48ada408f6ac Author: Matthias Pohl AuthorDate: Mon Jun 19 18:17:10 2023 +0200 [FLINK-30342][test] Migrates ZooKeeperLeaderElectionTest to MultipleComponentLeaderElectionDriver It also removes ZooKeeperLeaderElectionTest.testLeaderShouldBeCorrectedWhenOverwritten: It is still valid because it tests the communication between the ZK LeaderElectionDriver implementation and the ZK backend for a leadership change. But this specific code path is also covered by ZooKeeperMultipleComponentLeaderElectionDriverTest.testLeaderInformationChange Signed-off-by: Matthias Pohl --- .../apache/flink/runtime/util/ZooKeeperUtils.java | 3 + .../runtime/leaderelection/LeaderElectionTest.java | 1 + .../ZooKeeperLeaderElectionTest.java | 375 ++--- 3 files changed, 114 insertions(+), 265 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 8312b5136d1..b1e3917144d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -415,6 +415,7 @@ public class ZooKeeperUtils { * @param client The {@link CuratorFramework} ZooKeeper client to use * @return {@link DefaultLeaderElectionService} instance. */ +@Deprecated public static DefaultLeaderElectionService createLeaderElectionService(CuratorFramework client) throws Exception { @@ -429,6 +430,7 @@ public class ZooKeeperUtils { * @param path The path for the leader election * @return {@link DefaultLeaderElectionService} instance. */ +@Deprecated public static DefaultLeaderElectionService createLeaderElectionService( final CuratorFramework client, final String path) throws Exception { final DefaultLeaderElectionService leaderElectionService = @@ -444,6 +446,7 @@ public class ZooKeeperUtils { * @param client The {@link CuratorFramework} ZooKeeper client to use * @return {@link LeaderElectionDriverFactory} instance. */ +@Deprecated public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory( final CuratorFramework client) { return createLeaderElectionDriverFactory(client, ""); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index 9cc66385187..dcccfe2890f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -155,6 +155,7 @@ public class LeaderElectionTest { LeaderElection createLeaderElection() throws Exception; } +@Deprecated private static final class ZooKeeperServiceClass implements ServiceClass { private TestingServer testingServer; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index c7d58f3dcd9..013530f81c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.EachCallbackWrapper; -import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener; import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver; @@ -46,7 +45,6 @@ import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cac import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.flink.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode; -import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.ACL; import org.junit.jupiter.api.BeforeEach; @@ -57,22 +55,16 @@ import org.mockito.Mockito; imp
[flink] branch master updated (5553ea78488 -> c4c633e8439)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 5553ea78488 [FLINK-30343][runtime] Migrates KubernetesLeaderElectionAndRetrievalITCase to the MultipleComponentLeaderElectionDriver interface new ef9f43847e7 [hotfix][runtime] Adds helper method that can specify a parent path that is used for the ZooKeeperRetrievalDriver new effcec82070 [hotfix][runtime] Improves initial logging for ZK leader retrieval and election new c4c633e8439 [FLINK-30342][test] Migrates ZooKeeperLeaderElectionTest to MultipleComponentLeaderElectionDriver The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...eeperMultipleComponentLeaderElectionDriver.java | 11 +- .../ZooKeeperLeaderRetrievalDriver.java| 5 + .../apache/flink/runtime/util/ZooKeeperUtils.java | 17 +- .../runtime/leaderelection/LeaderElectionTest.java | 1 + .../ZooKeeperLeaderElectionTest.java | 375 ++--- 5 files changed, 139 insertions(+), 270 deletions(-)
[flink-web] branch hotfix-contribute-code deleted (was 4d66be189)
This is an automated email from the ASF dual-hosted git repository. klion26 pushed a change to branch hotfix-contribute-code in repository https://gitbox.apache.org/repos/asf/flink-web.git was 4d66be189 [hotfix] Fix the wrong conflic resolve This change permanently discards the following revisions: discard 4d66be189 [hotfix] Fix the wrong conflic resolve
[flink-web] branch hotfix-contribute-code created (now 4d66be189)
This is an automated email from the ASF dual-hosted git repository. klion26 pushed a change to branch hotfix-contribute-code in repository https://gitbox.apache.org/repos/asf/flink-web.git at 4d66be189 [hotfix] Fix the wrong conflic resolve This branch includes the following new commits: new 4d66be189 [hotfix] Fix the wrong conflic resolve The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink-web] 01/01: [hotfix] Fix the wrong conflic resolve
This is an automated email from the ASF dual-hosted git repository. klion26 pushed a commit to branch hotfix-contribute-code in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 4d66be189b1201ec1a73a3869a8d13051be6c880 Author: congxianqiu AuthorDate: Wed Jun 21 15:36:37 2023 +0800 [hotfix] Fix the wrong conflic resolve --- docs/content/how-to-contribute/contribute-code.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/how-to-contribute/contribute-code.md b/docs/content/how-to-contribute/contribute-code.md index d60dd31fc..e38318c5d 100644 --- a/docs/content/how-to-contribute/contribute-code.md +++ b/docs/content/how-to-contribute/contribute-code.md @@ -106,7 +106,7 @@ to the Flink project and want to learn about it and its contribution process, yo 2Implement -Implement the change according to the }}">Code Style and Quality Guide and the approach agreed upon in the Jira ticket. +Implement the change according to the }}">Code Style and Quality Guide and the approach agreed upon in the Jira ticket. Only start working on the implementation if there is consensus on the approach (e.g. you are assigned to the ticket) @@ -204,7 +204,7 @@ Once you've been assigned to a Jira issue, you may start to implement the requir Here are some further points to keep in mind while implementing: - [Set up a Flink development environment](https://cwiki.apache.org/confluence/display/FLINK/Setting+up+a+Flink+development+environment) -- Follow the [Code Style and Quality Guide]({{< relref "how-to-contribute/contribute-code" >}}) of Flink +- Follow the [Code Style and Quality Guide]({{< relref "how-to-contribute/code-style-and-quality-preamble" >}}) of Flink - Take any discussions and requirements from the Jira issue or design document into account. - Do not mix unrelated issues into one contribution.