[flink-connector-pulsar] 01/01: [FLINK-32003] Upgrade pulsar-client version to work with OAuth2
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch FLINK-32003 in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git commit 59303cbb9b0e5602d05e47c1825bf5f04baec4b2 Author: tison AuthorDate: Sat May 6 13:53:41 2023 +0800 [FLINK-32003] Upgrade pulsar-client version to work with OAuth2 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 2fa2d4c..9730c3d 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ under the License. 1.16.0 -2.10.1 +2.10.2 2.13.4.20221013 1.45.1 @@ -689,4 +689,4 @@ under the License. - \ No newline at end of file +
[flink-connector-pulsar] branch FLINK-32003 created (now 59303cb)
This is an automated email from the ASF dual-hosted git repository. tison pushed a change to branch FLINK-32003 in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git at 59303cb [FLINK-32003] Upgrade pulsar-client version to work with OAuth2 This branch includes the following new commits: new 59303cb [FLINK-32003] Upgrade pulsar-client version to work with OAuth2 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] 03/03: [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3379c2c085da43bb452536c981a7fc13f39482ee Author: Weijie Guo AuthorDate: Fri Apr 21 17:35:23 2023 +0800 [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool. --- .../runtime/io/network/buffer/LocalBufferPool.java | 72 ++ .../io/network/buffer/LocalBufferPoolTest.java | 21 +-- 2 files changed, 35 insertions(+), 58 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 190734c35b4..6d6d236f902 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -49,11 +49,11 @@ import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; * * The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match - * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments + - * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In - * order to meet this requirement, when the size of this pool changes, - * numberOfRequestedMemorySegments and numberOfRequestedOverdraftMemorySegments can be converted to - * each other. + * its new size. + * + * New buffers can be requested only when {@code numberOfRequestedMemorySegments < + * currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers exceeding the + * currentPoolSize will be dynamically regarded as overdraft buffers. * * Availability is defined as returning a non-overdraft segment on a subsequent {@link * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a non-blocking {@link @@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool { private int maxOverdraftBuffersPerGate; -@GuardedBy("availableMemorySegments") -private int numberOfRequestedOverdraftMemorySegments; - @GuardedBy("availableMemorySegments") private boolean isDestroyed; @@ -306,13 +303,6 @@ class LocalBufferPool implements BufferPool { } } -@VisibleForTesting -public int getNumberOfRequestedOverdraftMemorySegments() { -synchronized (availableMemorySegments) { -return numberOfRequestedOverdraftMemorySegments; -} -} - @Override public int getNumberOfAvailableMemorySegments() { synchronized (availableMemorySegments) { @@ -331,11 +321,7 @@ class LocalBufferPool implements BufferPool { @SuppressWarnings("FieldAccessNotGuarded") @Override public int bestEffortGetNumOfUsedBuffers() { -return Math.max( -0, -numberOfRequestedMemorySegments -+ numberOfRequestedOverdraftMemorySegments -- availableMemorySegments.size()); +return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size()); } @Override @@ -452,14 +438,9 @@ class LocalBufferPool implements BufferPool { return false; } -checkState( -!isDestroyed, -"Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); - -MemorySegment segment = networkBufferPool.requestPooledMemorySegment(); +MemorySegment segment = requestPooledMemorySegment(); if (segment != null) { availableMemorySegments.add(segment); -numberOfRequestedMemorySegments++; return true; } return false; @@ -469,17 +450,25 @@ class LocalBufferPool implements BufferPool { private MemorySegment requestOverdraftMemorySegmentFromGlobal() { assert Thread.holdsLock(availableMemorySegments); -if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate) { +// if overdraft buffers(i.e. buffers exceeding poolSize) is greater than or equal to +// maxOverdraftBuffersPerGate, no new buffer can be requested. +if (numberOfRequestedMemorySegments - currentPoolSize >= maxOverdraftBuffersPerGate) { return null; } +return requestPooledMemorySegment(); +} + +@Nullable +@GuardedBy("availableMemorySegments") +private MemorySegment requestPooledMemorySegment() { checkState( !isDestroyed, "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); MemorySegment segment =
[flink] 01/03: [hotfix] Fix the link to a private field in other class.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit babefacdde5b1f4f90e74306d9cd4ccfc53cb1a7 Author: Weijie Guo AuthorDate: Fri Apr 21 17:40:58 2023 +0800 [hotfix] Fix the link to a private field in other class. This illegal link will make IDE not happy. --- .../org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 6506ab9f942..e012c725582 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -85,7 +85,7 @@ class LocalBufferPool implements BufferPool { * * BEWARE: Take special care with the interactions between this lock and * locks acquired before entering this class vs. locks being acquired during calls to external - * code inside this class, e.g. with {@link + * code inside this class, e.g. with {@code * org.apache.flink.runtime.io.network.partition.consumer.BufferManager#bufferQueue} via the * {@link #registeredListeners} callback. */
[flink] 02/03: [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f80d7a4428e26b319172a56e02976bcbf5707a4f Author: Weijie Guo AuthorDate: Fri Apr 21 17:46:02 2023 +0800 [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name. --- .../runtime/io/network/buffer/LocalBufferPool.java | 11 +- .../io/network/buffer/NetworkBufferPool.java | 18 .../network/metrics/NettyShuffleMetricFactory.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 5 + .../io/network/buffer/NetworkBufferPoolTest.java | 24 +++--- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index e012c725582..190734c35b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool { } /** + * Estimates the number of requested buffers. + * * @return the same value as {@link #getMaxNumberOfMemorySegments()} for bounded pools. For * unbounded pools it returns an approximation based upon {@link * #getNumberOfRequiredMemorySegments()} */ -public int getNumberOfRequestedMemorySegments() { +public int getEstimatedNumberOfRequestedMemorySegments() { if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) { return maxNumberOfMemorySegments; } else { @@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool { } } +@VisibleForTesting +public int getNumberOfRequestedMemorySegments() { +synchronized (availableMemorySegments) { +return numberOfRequestedMemorySegments; +} +} + @VisibleForTesting public int getNumberOfRequestedOverdraftMemorySegments() { synchronized (availableMemorySegments) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 24ca78fd551..c14f89fe096 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -366,36 +366,38 @@ public class NetworkBufferPool } } -public long getNumberOfRequestedMemorySegments() { +public long getEstimatedNumberOfRequestedMemorySegments() { long requestedSegments = 0; synchronized (factoryLock) { for (LocalBufferPool bufferPool : allBufferPools) { -requestedSegments += bufferPool.getNumberOfRequestedMemorySegments(); +requestedSegments += bufferPool.getEstimatedNumberOfRequestedMemorySegments(); } } return requestedSegments; } -public long getRequestedMemory() { -return getNumberOfRequestedMemorySegments() * memorySegmentSize; +public long getEstimatedRequestedMemory() { +return getEstimatedNumberOfRequestedMemorySegments() * memorySegmentSize; } -public int getRequestedSegmentsUsage() { +public int getEstimatedRequestedSegmentsUsage() { int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments(); return totalNumberOfMemorySegments == 0 ? 0 : Math.toIntExact( -100L * getNumberOfRequestedMemorySegments() / totalNumberOfMemorySegments); +100L +* getEstimatedNumberOfRequestedMemorySegments() +/ totalNumberOfMemorySegments); } @VisibleForTesting Optional getUsageWarning() { -int currentUsage = getRequestedSegmentsUsage(); +int currentUsage = getEstimatedRequestedSegmentsUsage(); Optional message = Optional.empty(); // do not log warning if the value hasn't changed to avoid spamming warnings. if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != currentUsage) { long totalMemory = getTotalMemory(); -long requestedMemory = getRequestedMemory(); +long requestedMemory = getEstimatedRequestedMemory(); long missingMemory = requestedMemory - totalMemory; message = Optional.of( diff --git
[flink] branch master updated (026d7ccfe1d -> 3379c2c085d)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 026d7ccfe1d [FLINK-32010][kubernetes] Properly handle KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in case the leader is already known. new babefacdde5 [hotfix] Fix the link to a private field in other class. new f80d7a4428e [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name. new 3379c2c085d [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/io/network/buffer/LocalBufferPool.java | 75 -- .../io/network/buffer/NetworkBufferPool.java | 18 +++--- .../network/metrics/NettyShuffleMetricFactory.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 24 --- .../io/network/buffer/NetworkBufferPoolTest.java | 24 +++ 5 files changed, 64 insertions(+), 79 deletions(-)
[flink] 02/03: [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit c838d68a9db30c2cba021c1164a0cd464050da5a Author: Weijie Guo AuthorDate: Fri Apr 21 17:46:02 2023 +0800 [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name. --- .../runtime/io/network/buffer/LocalBufferPool.java | 11 +- .../io/network/buffer/NetworkBufferPool.java | 18 .../network/metrics/NettyShuffleMetricFactory.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 5 + .../io/network/buffer/NetworkBufferPoolTest.java | 24 +++--- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index e012c725582..190734c35b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool { } /** + * Estimates the number of requested buffers. + * * @return the same value as {@link #getMaxNumberOfMemorySegments()} for bounded pools. For * unbounded pools it returns an approximation based upon {@link * #getNumberOfRequiredMemorySegments()} */ -public int getNumberOfRequestedMemorySegments() { +public int getEstimatedNumberOfRequestedMemorySegments() { if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) { return maxNumberOfMemorySegments; } else { @@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool { } } +@VisibleForTesting +public int getNumberOfRequestedMemorySegments() { +synchronized (availableMemorySegments) { +return numberOfRequestedMemorySegments; +} +} + @VisibleForTesting public int getNumberOfRequestedOverdraftMemorySegments() { synchronized (availableMemorySegments) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 24ca78fd551..c14f89fe096 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -366,36 +366,38 @@ public class NetworkBufferPool } } -public long getNumberOfRequestedMemorySegments() { +public long getEstimatedNumberOfRequestedMemorySegments() { long requestedSegments = 0; synchronized (factoryLock) { for (LocalBufferPool bufferPool : allBufferPools) { -requestedSegments += bufferPool.getNumberOfRequestedMemorySegments(); +requestedSegments += bufferPool.getEstimatedNumberOfRequestedMemorySegments(); } } return requestedSegments; } -public long getRequestedMemory() { -return getNumberOfRequestedMemorySegments() * memorySegmentSize; +public long getEstimatedRequestedMemory() { +return getEstimatedNumberOfRequestedMemorySegments() * memorySegmentSize; } -public int getRequestedSegmentsUsage() { +public int getEstimatedRequestedSegmentsUsage() { int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments(); return totalNumberOfMemorySegments == 0 ? 0 : Math.toIntExact( -100L * getNumberOfRequestedMemorySegments() / totalNumberOfMemorySegments); +100L +* getEstimatedNumberOfRequestedMemorySegments() +/ totalNumberOfMemorySegments); } @VisibleForTesting Optional getUsageWarning() { -int currentUsage = getRequestedSegmentsUsage(); +int currentUsage = getEstimatedRequestedSegmentsUsage(); Optional message = Optional.empty(); // do not log warning if the value hasn't changed to avoid spamming warnings. if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != currentUsage) { long totalMemory = getTotalMemory(); -long requestedMemory = getRequestedMemory(); +long requestedMemory = getEstimatedRequestedMemory(); long missingMemory = requestedMemory - totalMemory; message = Optional.of( diff --git
[flink] branch release-1.17 updated (91dfb22e0bc -> 2edc003004d)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git from 91dfb22e0bc [FLINK-31834][Azure] Free up disk space before caching new 703a3f11ee6 [hotfix] Fix the link to a private field in other class. new c838d68a9db [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name. new 2edc003004d [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/io/network/buffer/LocalBufferPool.java | 75 -- .../io/network/buffer/NetworkBufferPool.java | 18 +++--- .../network/metrics/NettyShuffleMetricFactory.java | 2 +- .../io/network/buffer/LocalBufferPoolTest.java | 24 --- .../io/network/buffer/NetworkBufferPoolTest.java | 24 +++ 5 files changed, 64 insertions(+), 79 deletions(-)
[flink] 03/03: [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 2edc003004d0d0f1e40ef2b7e14189965183f46c Author: Weijie Guo AuthorDate: Fri Apr 21 17:35:23 2023 +0800 [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool. --- .../runtime/io/network/buffer/LocalBufferPool.java | 72 ++ .../io/network/buffer/LocalBufferPoolTest.java | 21 +-- 2 files changed, 35 insertions(+), 58 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 190734c35b4..6d6d236f902 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -49,11 +49,11 @@ import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; * * The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match - * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments + - * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In - * order to meet this requirement, when the size of this pool changes, - * numberOfRequestedMemorySegments and numberOfRequestedOverdraftMemorySegments can be converted to - * each other. + * its new size. + * + * New buffers can be requested only when {@code numberOfRequestedMemorySegments < + * currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers exceeding the + * currentPoolSize will be dynamically regarded as overdraft buffers. * * Availability is defined as returning a non-overdraft segment on a subsequent {@link * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a non-blocking {@link @@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool { private int maxOverdraftBuffersPerGate; -@GuardedBy("availableMemorySegments") -private int numberOfRequestedOverdraftMemorySegments; - @GuardedBy("availableMemorySegments") private boolean isDestroyed; @@ -306,13 +303,6 @@ class LocalBufferPool implements BufferPool { } } -@VisibleForTesting -public int getNumberOfRequestedOverdraftMemorySegments() { -synchronized (availableMemorySegments) { -return numberOfRequestedOverdraftMemorySegments; -} -} - @Override public int getNumberOfAvailableMemorySegments() { synchronized (availableMemorySegments) { @@ -331,11 +321,7 @@ class LocalBufferPool implements BufferPool { @SuppressWarnings("FieldAccessNotGuarded") @Override public int bestEffortGetNumOfUsedBuffers() { -return Math.max( -0, -numberOfRequestedMemorySegments -+ numberOfRequestedOverdraftMemorySegments -- availableMemorySegments.size()); +return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size()); } @Override @@ -452,14 +438,9 @@ class LocalBufferPool implements BufferPool { return false; } -checkState( -!isDestroyed, -"Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); - -MemorySegment segment = networkBufferPool.requestPooledMemorySegment(); +MemorySegment segment = requestPooledMemorySegment(); if (segment != null) { availableMemorySegments.add(segment); -numberOfRequestedMemorySegments++; return true; } return false; @@ -469,17 +450,25 @@ class LocalBufferPool implements BufferPool { private MemorySegment requestOverdraftMemorySegmentFromGlobal() { assert Thread.holdsLock(availableMemorySegments); -if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate) { +// if overdraft buffers(i.e. buffers exceeding poolSize) is greater than or equal to +// maxOverdraftBuffersPerGate, no new buffer can be requested. +if (numberOfRequestedMemorySegments - currentPoolSize >= maxOverdraftBuffersPerGate) { return null; } +return requestPooledMemorySegment(); +} + +@Nullable +@GuardedBy("availableMemorySegments") +private MemorySegment requestPooledMemorySegment() { checkState( !isDestroyed, "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks."); MemorySegment segment =
[flink] 01/03: [hotfix] Fix the link to a private field in other class.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 703a3f11ee651fbbf6c4c4abc01d82ee184890d6 Author: Weijie Guo AuthorDate: Fri Apr 21 17:40:58 2023 +0800 [hotfix] Fix the link to a private field in other class. This illegal link will make IDE not happy. --- .../org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 6506ab9f942..e012c725582 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -85,7 +85,7 @@ class LocalBufferPool implements BufferPool { * * BEWARE: Take special care with the interactions between this lock and * locks acquired before entering this class vs. locks being acquired during calls to external - * code inside this class, e.g. with {@link + * code inside this class, e.g. with {@code * org.apache.flink.runtime.io.network.partition.consumer.BufferManager#bufferQueue} via the * {@link #registeredListeners} callback. */
[flink] 01/04: [FLINK-32010][kubernetes] Remove an unused parameter from KubernetesLeaderRetrievalDriver.
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 058b1f5045d607a1985ee063618e3819b14fb75d Author: David Moravek AuthorDate: Fri May 5 10:09:49 2023 +0200 [FLINK-32010][kubernetes] Remove an unused parameter from KubernetesLeaderRetrievalDriver. --- .../highavailability/KubernetesLeaderRetrievalDriver.java | 4 .../highavailability/KubernetesLeaderRetrievalDriverFactory.java| 6 -- .../KubernetesMultipleComponentLeaderRetrievalDriverFactory.java| 1 - .../highavailability/KubernetesHighAvailabilityTestBase.java| 1 - .../KubernetesLeaderElectionAndRetrievalITCase.java | 1 - 5 files changed, 13 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java index fb3516a30ac..33e88ba903f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java @@ -49,8 +49,6 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver { private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderRetrievalDriver.class); -private final FlinkKubeClient kubeClient; - private final String configMapName; private final LeaderRetrievalEventHandler leaderRetrievalEventHandler; @@ -64,14 +62,12 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver { private final Function leaderInformationExtractor; public KubernetesLeaderRetrievalDriver( -FlinkKubeClient kubeClient, KubernetesConfigMapSharedWatcher configMapSharedWatcher, Executor watchExecutor, String configMapName, LeaderRetrievalEventHandler leaderRetrievalEventHandler, Function leaderInformationExtractor, FatalErrorHandler fatalErrorHandler) { -this.kubeClient = checkNotNull(kubeClient, "Kubernetes client"); this.configMapName = checkNotNull(configMapName, "ConfigMap name"); this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler, "LeaderRetrievalEventHandler"); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java index cba8d65e9f0..408f0f6f15f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.highavailability; -import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher; import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriverFactory; @@ -35,19 +34,15 @@ import java.util.concurrent.Executor; @Deprecated public class KubernetesLeaderRetrievalDriverFactory implements LeaderRetrievalDriverFactory { -private final FlinkKubeClient kubeClient; - private final KubernetesConfigMapSharedWatcher configMapSharedWatcher; private final Executor watchExecutor; private final String configMapName; public KubernetesLeaderRetrievalDriverFactory( -FlinkKubeClient kubeClient, KubernetesConfigMapSharedWatcher configMapSharedWatcher, Executor watchExecutor, String configMapName) { -this.kubeClient = kubeClient; this.configMapSharedWatcher = configMapSharedWatcher; this.watchExecutor = watchExecutor; this.configMapName = configMapName; @@ -57,7 +52,6 @@ public class KubernetesLeaderRetrievalDriverFactory implements LeaderRetrievalDr public KubernetesLeaderRetrievalDriver createLeaderRetrievalDriver( LeaderRetrievalEventHandler leaderEventHandler, FatalErrorHandler fatalErrorHandler) { return new KubernetesLeaderRetrievalDriver( -kubeClient, configMapSharedWatcher, watchExecutor, configMapName, diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderRetrievalDriverFactory.java
[flink] branch master updated (2ff4e220945 -> 026d7ccfe1d)
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver new 058b1f5045d [FLINK-32010][kubernetes] Remove an unused parameter from KubernetesLeaderRetrievalDriver. new 075f359fdbb [FLINK-32010][kubernetes] Rename KubernetesUtils#checkConfigMaps to KubernetesUtils#getOnlyConfigMap to provide more clarity. new 6009e811ecf [FLINK-32010][runtime] Leader election/retrieval drivers should properly extend AutoCloseable instead of having their own close method with the same signature. new 026d7ccfe1d [FLINK-32010][kubernetes] Properly handle KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in case the leader is already known. The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../KubernetesLeaderElectionDriver.java| 8 +- .../KubernetesLeaderRetrievalDriver.java | 19 ++-- .../KubernetesLeaderRetrievalDriverFactory.java| 6 -- ...netesMultipleComponentLeaderElectionDriver.java | 8 +- ...tipleComponentLeaderRetrievalDriverFactory.java | 1 - .../flink/kubernetes/utils/KubernetesUtils.java| 12 ++- .../KubernetesHighAvailabilityTestBase.java| 1 - ...KubernetesLeaderElectionAndRetrievalITCase.java | 103 + .../leaderelection/LeaderElectionDriver.java | 5 +- .../leaderretrieval/LeaderRetrievalDriver.java | 6 +- .../TestingLeaderElectionEventHandler.java | 3 +- 11 files changed, 96 insertions(+), 76 deletions(-)
[flink] 03/04: [FLINK-32010][runtime] Leader election/retrieval drivers should properly extend AutoCloseable instead of having their own close method with the same signature.
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6009e811ecfee8d7378016b71ebd6bf11e364427 Author: David Moravek AuthorDate: Fri May 5 12:31:09 2023 +0200 [FLINK-32010][runtime] Leader election/retrieval drivers should properly extend AutoCloseable instead of having their own close method with the same signature. --- .../apache/flink/runtime/leaderelection/LeaderElectionDriver.java | 5 + .../apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java | 6 +- .../runtime/leaderelection/TestingLeaderElectionEventHandler.java | 3 ++- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java index 87ab1a79d83..be16ca8a5a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java @@ -28,7 +28,7 @@ package org.apache.flink.runtime.leaderelection; * Important: The {@link LeaderElectionDriver} could not guarantee that there is * no {@link LeaderElectionEventHandler} callbacks happen after {@link #close()}. */ -public interface LeaderElectionDriver { +public interface LeaderElectionDriver extends AutoCloseable { /** * Write the current leader information to external persistent storage(e.g. Zookeeper, @@ -49,7 +49,4 @@ public interface LeaderElectionDriver { * @return Return whether the driver has leadership. */ boolean hasLeadership(); - -/** Close the services used for leader election. */ -void close() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java index 35f87fc1d81..48add240540 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java @@ -25,8 +25,4 @@ package org.apache.flink.runtime.leaderretrieval; * Important: The {@link LeaderRetrievalDriver} could not guarantee that there * is no {@link LeaderRetrievalEventHandler} callbacks happen after {@link #close()}. */ -public interface LeaderRetrievalDriver { - -/** Close the services used for leader retrieval. */ -void close() throws Exception; -} +public interface LeaderRetrievalDriver extends AutoCloseable {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java index af4e67b758e..5025e02de4a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java @@ -31,7 +31,7 @@ import java.util.function.Consumer; * testing purposes. */ public class TestingLeaderElectionEventHandler extends TestingLeaderBase -implements LeaderElectionEventHandler { +implements LeaderElectionEventHandler, AutoCloseable { private final Object lock = new Object(); @@ -135,6 +135,7 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase } } +@Override public void close() { synchronized (lock) { running = false;
[flink] 02/04: [FLINK-32010][kubernetes] Rename KubernetesUtils#checkConfigMaps to KubernetesUtils#getOnlyConfigMap to provide more clarity.
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 075f359fdbbe331c2ee93479204e7fec57fc9b8b Author: David Moravek AuthorDate: Fri May 5 10:10:46 2023 +0200 [FLINK-32010][kubernetes] Rename KubernetesUtils#checkConfigMaps to KubernetesUtils#getOnlyConfigMap to provide more clarity. --- .../highavailability/KubernetesLeaderElectionDriver.java | 8 .../highavailability/KubernetesLeaderRetrievalDriver.java| 4 ++-- .../KubernetesMultipleComponentLeaderElectionDriver.java | 8 .../org/apache/flink/kubernetes/utils/KubernetesUtils.java | 12 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java index 92c3a8345e6..ec0dfe30e46 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java @@ -44,8 +44,8 @@ import java.util.concurrent.ExecutorService; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY; import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY; -import static org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps; import static org.apache.flink.kubernetes.utils.KubernetesUtils.getLeaderInformationFromConfigMap; +import static org.apache.flink.kubernetes.utils.KubernetesUtils.getOnlyConfigMap; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -216,7 +216,7 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver { @Override public void onModified(List configMaps) { // We should only receive events for the watched ConfigMap -final KubernetesConfigMap configMap = checkConfigMaps(configMaps, configMapName); +final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName); if (KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) { leaderElectionEventHandler.onLeaderInformationChange( @@ -226,7 +226,7 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver { @Override public void onDeleted(List configMaps) { -final KubernetesConfigMap configMap = checkConfigMaps(configMaps, configMapName); +final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName); // The ConfigMap is deleted externally. if (KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) { fatalErrorHandler.onFatalError( @@ -237,7 +237,7 @@ public class KubernetesLeaderElectionDriver implements LeaderElectionDriver { @Override public void onError(List configMaps) { -final KubernetesConfigMap configMap = checkConfigMaps(configMaps, configMapName); +final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName); if (KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) { fatalErrorHandler.onFatalError( new LeaderElectionException( diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java index 33e88ba903f..f08e4622165 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java @@ -35,7 +35,7 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.function.Function; -import static org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps; +import static org.apache.flink.kubernetes.utils.KubernetesUtils.getOnlyConfigMap; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -105,7 +105,7 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver { @Override public void onModified(List configMaps) { -final KubernetesConfigMap configMap = checkConfigMaps(configMaps, configMapName); +final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName); leaderRetrievalEventHandler.notifyLeaderAddress(
[flink] 04/04: [FLINK-32010][kubernetes] Properly handle KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in case the leader is already known.
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 026d7ccfe1d6f4cfa26c9038dd05403c889d2e0d Author: David Moravek AuthorDate: Fri May 5 12:32:55 2023 +0200 [FLINK-32010][kubernetes] Properly handle KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in case the leader is already known. --- .../KubernetesLeaderRetrievalDriver.java | 11 ++- ...KubernetesLeaderElectionAndRetrievalITCase.java | 102 + 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java index f08e4622165..035057c7441 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java @@ -98,9 +98,14 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver { @Override public void onAdded(List configMaps) { -// The ConfigMap is created by KubernetesLeaderElectionDriver with empty data. We do not -// process this -// useless event. +// The ConfigMap is created by KubernetesLeaderElectionDriver with empty data. We don't +// really need to process anything unless the retriever was started after the leader +// election has already succeeded. +final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName); +final LeaderInformation leaderInformation = leaderInformationExtractor.apply(configMap); +if (!leaderInformation.isEmpty()) { + leaderRetrievalEventHandler.notifyLeaderAddress(leaderInformation); +} } @Override diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java index 4d922cbda06..d101d19b805 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.highavailability; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.KubernetesExtension; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions; import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher; @@ -28,15 +29,18 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionEventHandler; import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler; -import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; import static org.assertj.core.api.Assertions.assertThat; @@ -54,73 +58,99 @@ class KubernetesLeaderElectionAndRetrievalITCase { "akka.tcp://flink@172.20.1.21:6123/user/rpc/dispatcher"; @RegisterExtension -private static final KubernetesExtension kubernetesExtension = new KubernetesExtension(); +private static final KubernetesExtension KUBERNETES_EXTENSION = new KubernetesExtension(); + +@RegisterExtension +private static final TestExecutorExtension EXECUTOR_EXTENSION = +new TestExecutorExtension<>(Executors::newCachedThreadPool); @Test void testLeaderElectionAndRetrieval() throws Exception { -final String configMapName = LEADER_CONFIGMAP_NAME + System.currentTimeMillis(); -KubernetesLeaderElectionDriver leaderElectionDriver = null; -
[flink] branch master updated: [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver
This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver 2ff4e220945 is described below commit 2ff4e220945680eba21065480385148ca8d034da Author: Shammon FY AuthorDate: Thu Apr 6 13:09:19 2023 +0800 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver Close apache/flink#22362 --- .../flink/table/jdbc/BaseDatabaseMetaData.java | 792 + .../org/apache/flink/table/jdbc/DriverUri.java | 4 + .../apache/flink/table/jdbc/FlinkConnection.java | 15 +- .../flink/table/jdbc/FlinkDatabaseMetaData.java| 384 ++ .../apache/flink/table/jdbc/FlinkResultSet.java| 29 +- .../flink/table/jdbc/FlinkResultSetMetaData.java | 10 +- .../table/jdbc/utils/CloseableResultIterator.java | 24 + .../table/jdbc/utils/CollectionResultIterator.java | 45 ++ .../table/jdbc/utils/DatabaseMetaDataUtils.java| 110 +++ .../table/jdbc/utils/StatementResultIterator.java | 46 ++ .../flink/table/jdbc/FlinkConnectionTest.java | 5 +- .../table/jdbc/FlinkDatabaseMetaDataTest.java | 121 12 files changed, 1567 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java new file mode 100644 index 000..75ec33aab67 --- /dev/null +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.jdbc; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.RowIdLifetime; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; + +/** Base {@link DatabaseMetaData} for flink driver with not supported features. */ +public abstract class BaseDatabaseMetaData implements DatabaseMetaData { +@Override +public String getUserName() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#getUserName is not supported"); +} + +@Override +public boolean nullsAreSortedHigh() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedHigh is not supported"); +} + +@Override +public boolean nullsAreSortedLow() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedLow is not supported"); +} + +@Override +public boolean nullsAreSortedAtStart() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedAtStart is not supported"); +} + +@Override +public boolean nullsAreSortedAtEnd() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#nullsAreSortedAtEnd is not supported"); +} + +@Override +public boolean supportsMixedCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#supportsMixedCaseIdentifiers is not supported"); +} + +@Override +public boolean storesUpperCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#storesUpperCaseIdentifiers is not supported"); +} + +@Override +public boolean storesLowerCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +"FlinkDatabaseMetaData#storesLowerCaseIdentifiers is not supported"); +} + +@Override +public boolean storesMixedCaseIdentifiers() throws SQLException { +throw new SQLFeatureNotSupportedException( +
[flink] branch master updated (43528be1b10 -> b814c369256)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 43528be1b10 [FLINK-32000][webui] Expose max parallelism in the vertex detail drawer. new 43688339721 [hotfix][test] Adds utility methods to OneShotLatch new 1d208f13d89 [hotfix][test] Adds helper method to testingLeaderElectionDriver for specifying session ID new ed195cfedbb [hotfix][test] Adds generic testing implementations for LeaderContender and LeaderElectionDriver new b814c369256 [FLINK-31878][runtime] Adds event processing in a separate thread to DefaultLeaderElectionService The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../DefaultLeaderElectionService.java | 62 - .../leaderelection/LeaderElectionEventHandler.java | 6 + .../DefaultLeaderElectionServiceTest.java | 280 ++--- .../TestingGenericLeaderContender.java | 116 + .../TestingGenericLeaderElectionDriver.java| 94 +++ .../TestingLeaderElectionDriver.java | 8 +- .../apache/flink/core/testutils/OneShotLatch.java | 24 ++ 7 files changed, 543 insertions(+), 47 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java
[flink] 03/04: [hotfix][test] Adds generic testing implementations for LeaderContender and LeaderElectionDriver
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ed195cfedbbcee45b9eee33b054a209b20f09b39 Author: Matthias Pohl AuthorDate: Thu Apr 20 14:42:15 2023 +0200 [hotfix][test] Adds generic testing implementations for LeaderContender and LeaderElectionDriver Signed-off-by: Matthias Pohl --- .../TestingGenericLeaderContender.java | 116 + .../TestingGenericLeaderElectionDriver.java| 94 + 2 files changed, 210 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java new file mode 100644 index 000..c385cb20c2a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * {@code TestingGenericLeaderContender} is a more generic testing implementation of the {@link + * LeaderContender} interface. + */ +public class TestingGenericLeaderContender implements LeaderContender { + +private final Object lock = new Object(); + +private final Consumer grantLeadershipConsumer; +private final Runnable revokeLeadershipRunnable; +private final Consumer handleErrorConsumer; +private final Supplier getDescriptionSupplier; + +private TestingGenericLeaderContender( +Consumer grantLeadershipConsumer, +Runnable revokeLeadershipRunnable, +Consumer handleErrorConsumer, +Supplier getDescriptionSupplier) { +this.grantLeadershipConsumer = grantLeadershipConsumer; +this.revokeLeadershipRunnable = revokeLeadershipRunnable; +this.handleErrorConsumer = handleErrorConsumer; +this.getDescriptionSupplier = getDescriptionSupplier; +} + +@Override +public void grantLeadership(UUID leaderSessionID) { +synchronized (lock) { +grantLeadershipConsumer.accept(leaderSessionID); +} +} + +@Override +public void revokeLeadership() { +synchronized (lock) { +revokeLeadershipRunnable.run(); +} +} + +@Override +public void handleError(Exception exception) { +synchronized (lock) { +handleErrorConsumer.accept(exception); +} +} + +@Override +public String getDescription() { +return getDescriptionSupplier.get(); +} + +public static Builder newBuilder() { +return new Builder(); +} + +/** {@code Builder} for creating {@code TestingGenericLeaderContender} instances. */ +public static class Builder { +private Consumer grantLeadershipConsumer = ignoredSessionID -> {}; +private Runnable revokeLeadershipRunnable = () -> {}; +private Consumer handleErrorConsumer = ignoredError -> {}; +private Supplier getDescriptionSupplier = () -> "testing contender"; + +private Builder() {} + +public Builder setGrantLeadershipConsumer(Consumer grantLeadershipConsumer) { +this.grantLeadershipConsumer = grantLeadershipConsumer; +return this; +} + +public Builder setRevokeLeadershipRunnable(Runnable revokeLeadershipRunnable) { +this.revokeLeadershipRunnable = revokeLeadershipRunnable; +return this; +} + +public Builder setHandleErrorConsumer(Consumer handleErrorConsumer) { +this.handleErrorConsumer = handleErrorConsumer; +return this; +} + +public Builder setGetDescriptionSupplier(Supplier getDescriptionSupplier) { +this.getDescriptionSupplier = getDescriptionSupplier; +return this; +} + +public TestingGenericLeaderContender build() { +return new
[flink] 04/04: [FLINK-31878][runtime] Adds event processing in a separate thread to DefaultLeaderElectionService
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b814c369256f011f18fc7283d60cd4a27593d988 Author: Matthias Pohl AuthorDate: Tue Apr 18 16:17:02 2023 +0200 [FLINK-31878][runtime] Adds event processing in a separate thread to DefaultLeaderElectionService Signed-off-by: Matthias Pohl --- .../DefaultLeaderElectionService.java | 62 - .../leaderelection/LeaderElectionEventHandler.java | 6 + .../DefaultLeaderElectionServiceTest.java | 280 ++--- 3 files changed, 303 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 19a56fcbd26..f75896a6967 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -20,7 +20,10 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,10 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,7 +74,20 @@ public class DefaultLeaderElectionService // this.running=true ensures that leaderContender != null private LeaderElectionDriver leaderElectionDriver; +private final ExecutorService leadershipOperationExecutor; + public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) { +this( +leaderElectionDriverFactory, +Executors.newSingleThreadExecutor( +new ExecutorThreadFactory( + "DefaultLeaderElectionService-leadershipOperationExecutor"))); +} + +@VisibleForTesting +DefaultLeaderElectionService( +LeaderElectionDriverFactory leaderElectionDriverFactory, +ExecutorService leadershipOperationExecutor) { this.leaderElectionDriverFactory = checkNotNull(leaderElectionDriverFactory); this.leaderContender = null; @@ -79,6 +99,8 @@ public class DefaultLeaderElectionService this.confirmedLeaderInformation = LeaderInformation.empty(); this.running = false; + +this.leadershipOperationExecutor = leadershipOperationExecutor; } @Override @@ -118,6 +140,10 @@ public class DefaultLeaderElectionService } leaderElectionDriver.close(); + +// graceful shutdown needs to happen outside the lock to enable any outstanding +// grant/revoke events to be processed without the lock being acquired by the service +ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, leadershipOperationExecutor); } @Override @@ -184,6 +210,10 @@ public class DefaultLeaderElectionService @Override public void onGrantLeadership(UUID newLeaderSessionId) { +runInLeaderEventThread(() -> onGrantLeadershipInternal(newLeaderSessionId)); +} + +private void onGrantLeadershipInternal(UUID newLeaderSessionId) { synchronized (lock) { if (running) { issuedLeaderSessionID = newLeaderSessionId; @@ -205,6 +235,10 @@ public class DefaultLeaderElectionService @Override public void onRevokeLeadership() { +runInLeaderEventThread(this::onRevokeLeadershipInternal); +} + +private void onRevokeLeadershipInternal() { synchronized (lock) { if (running) { handleLeadershipLoss(); @@ -233,6 +267,10 @@ public class DefaultLeaderElectionService @Override public void onLeaderInformationChange(LeaderInformation leaderInformation) { +runInLeaderEventThread(() -> onLeaderInformationChangeInternal(leaderInformation)); +} + +private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) { synchronized (lock) { if (running) { LOG.trace( @@ -257,12 +295,28 @@ public class DefaultLeaderElectionService } } else { LOG.debug( -"Ignoring change
[flink] 01/04: [hotfix][test] Adds utility methods to OneShotLatch
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 43688339721176c8b8070fb19ac1dcda990f9017 Author: Matthias Pohl AuthorDate: Tue Apr 18 14:36:22 2023 +0200 [hotfix][test] Adds utility methods to OneShotLatch Signed-off-by: Matthias Pohl --- .../apache/flink/core/testutils/OneShotLatch.java | 24 ++ 1 file changed, 24 insertions(+) diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java index 82c5afb787e..c0ed331fd05 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java @@ -66,6 +66,18 @@ public final class OneShotLatch { } } +/** + * Calls {@link #await()} and transforms any {@link InterruptedException} into a {@link + * RuntimeException}. + */ +public void awaitQuietly() { +try { +await(); +} catch (InterruptedException e) { +throw new RuntimeException(e); +} +} + /** * Waits until {@link OneShotLatch#trigger()} is called. Once {@code #trigger()} has been called * this call will always return immediately. @@ -109,6 +121,18 @@ public final class OneShotLatch { } } +/** + * Calls {@link #await(long, TimeUnit)} and transforms any {@link InterruptedException} or + * {@link TimeoutException} into a {@link RuntimeException}. + */ +public void awaitQuietly(long timeout, TimeUnit timeUnit) { +try { +await(timeout, timeUnit); +} catch (InterruptedException | TimeoutException e) { +throw new RuntimeException(e); +} +} + /** * Checks if the latch was triggered. *
[flink] 02/04: [hotfix][test] Adds helper method to testingLeaderElectionDriver for specifying session ID
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1d208f13d8954c68a10c1b380b3b0dfd00921f63 Author: Matthias Pohl AuthorDate: Tue Apr 18 18:26:09 2023 +0200 [hotfix][test] Adds helper method to testingLeaderElectionDriver for specifying session ID Signed-off-by: Matthias Pohl --- .../flink/runtime/leaderelection/TestingLeaderElectionDriver.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java index 5352e498641..9f344b30aa3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java @@ -69,13 +69,17 @@ public class TestingLeaderElectionDriver implements LeaderElectionDriver { return leaderInformation; } -public void isLeader() { +public void isLeader(UUID newSessionID) { synchronized (lock) { isLeader.set(true); -leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()); +leaderElectionEventHandler.onGrantLeadership(newSessionID); } } +public void isLeader() { +isLeader(UUID.randomUUID()); +} + public void notLeader() { synchronized (lock) { isLeader.set(false);
[flink-connector-cassandra] branch main updated: [FLINK-31698] Support both Flink 1.16 and Flink 1.17 to resolve failing nightly builds
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git The following commit(s) were added to refs/heads/main by this push: new f8ac343 [FLINK-31698] Support both Flink 1.16 and Flink 1.17 to resolve failing nightly builds f8ac343 is described below commit f8ac3431f3f5b3f3a8d0e39885a446dc341ea764 Author: MartijnVisser AuthorDate: Thu Apr 13 12:08:11 2023 +0200 [FLINK-31698] Support both Flink 1.16 and Flink 1.17 to resolve failing nightly builds --- .github/workflows/push_pr.yml | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 335f108..0542242 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -16,13 +16,16 @@ # limitations under the License. -name: CI +name: Build flink-connector-cassandra on: [push, pull_request] concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true jobs: compile_and_test: +strategy: + matrix: +flink: [1.16.1, 1.17.0] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: - flink_version: 1.17.0 + flink_version: ${{ matrix.flink }}
svn commit: r61661 - in /dev/flink/flink-connector-cassandra-3.1.0-rc2: ./ flink-connector-cassandra-3.1.0-src.tgz flink-connector-cassandra-3.1.0-src.tgz.asc flink-connector-cassandra-3.1.0-src.tgz.s
Author: dannycranmer Date: Fri May 5 09:27:15 2023 New Revision: 61661 Log: Add flink-connector-cassandra-3.1.0-rc2 Added: dev/flink/flink-connector-cassandra-3.1.0-rc2/ dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz (with props) dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512 Added: dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz == Binary file - no diff available. Propchange: dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz -- svn:mime-type = application/octet-stream Added: dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc == --- dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc (added) +++ dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc Fri May 5 09:27:15 2023 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiEED3nyr7I1G8KWeFRFkfnB7BJf2NsFAmRUy+4ACgkQkfnB7BJf +2Nuffw//fYf/kdvsY6E7PDBK2uh1R/ufc+nzd9F8LDzszVNAdjtHyo192ryOqpVm +hPuytYIRacIa1IJJenvN5/xKx1Db0hPvM4rxX54GYaBFdHYiUAoDGBbai8ThUp5F +Qeql37/JN/3dzsCZjETVCVN23CdFT5jxydtUPQQyj+PIwUc6Son4QOtwdRezUI4+ +GsKpGWjRS6ORxeR2Rit+HUUyuXRF1OevXKEbBiT0+DA2370J3a8r8dRJ+Det +G1YpqrlN8MBjBmBx143zuiCveSXibF8DeHGqDGP3YoZodozpZg/WNKGH6ZhjLM/p +QBgivejHAsu3Q5V5nuYS6RlqCY/Gr9yGLsAkvprjpU3U/RwYzX3SN6gQedGc5owI +zsUDNVD0hlCrR96XNKv5sIt+YWbrdoOJ5pUEHqtW7kzErNQqCB8yjYtUQgbajW74 +fNRfdP6VEpxFaP0lBr6E8QBTGtVDhZYYPmY0mCYBXV6z4fhPKwKCXPhR6rQJuCkW +LZm/QTXch4CVCNcX6vYHuB4VYfpp0Hn/GiTOm7EKXlAMKwlk5Ktgr+ahtrcE3mp8 +ebx+YFNI+XpQpM5bRBRQefAsXth0thq1CSZkCCsGcMksZvKtvBbuikFVYxgX0cpE +IwE1nxTOy/RjhzVqAI88OSmKUiZSGknIqTyfM3Cz1xu1WUBkeNU= +=orXf +-END PGP SIGNATURE- Added: dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512 == --- dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512 (added) +++ dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512 Fri May 5 09:27:15 2023 @@ -0,0 +1 @@ +15394ca5435be1625e5bb634b2aab1f6776981aca660daa1949f3d414a105062a0a9fbcfe39f73b84ffa9d438181888c36eae04d3eb33aa243b503f6a3197b73 flink-connector-cassandra-3.1.0-src.tgz
[flink-connector-cassandra] annotated tag v3.1.0-rc2 updated (83945fe -> fa0dd03)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a change to annotated tag v3.1.0-rc2 in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git *** WARNING: tag v3.1.0-rc2 was modified! *** from 83945fe (commit) to fa0dd03 (tag) tagging 83945fe41cb6e7c188dfbf656b04955142600bb2 (commit) by Danny Cranmer on Fri May 5 10:27:02 2023 +0100 - Log - vv3.1.0-rc2 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiEED3nyr7I1G8KWeFRFkfnB7BJf2NsFAmRUy+YACgkQkfnB7BJf 2NvODA//a84xAGHxiauAZhqus7NLyzZhqxyiRxGK+fTrtn9DgIAKSsz8aCNNMZw2 4mbwnMRF5wHlJnF31OgzLVWT4pQ4QlIwTAORfXCVFHbrUI0p9T1c3sXmy7RbiJ6L 31FeCnWyRKKuifx9zlhbnaLlG8obR0ujbGiO0DZoE8b+2l/V71PVYv9zEzu73IiL bQRO4RQz52j8qkH3yiGGaosACcyqHools2D5gOxvEW09hS9ofVLxpKTPom0v5y0d rWKp9o71dT0tsSipX5hq5v63Lxlb5TbOy+2xGvZiRtufqC8jMyW8NouDK7PiJYMF kOkGwpstwbjkrqIAkmZ77TcADmaKPliDSrG3IHYSRpdkdMQZLzUVT0+OECXzshhV ldfmT2ULMlpZAoLagi9JfkopZS3t/0Y8INjhl43TilgP5jdlAOIzKLnH7Z/z3ElL fj5mKSr9uUlmVNk4hyVLfqloC0tBO8W03+gBc6vJIPLOhGfUFyiGOZrlVoBbGTRB +l7IzUFUVSmRsc9XnQiF4m7SywyWJzZlOpKlAfGzyDx9oGI29bt8dbWYCHJb9MfW tqxjNEerD+0VM4CNZpCYHRFbsdrlEgtzxePkD3SGKwLNemwKdBzrtkgsR4VD0MXD hu/lOrp0AY1UwQtzOVF0vAT/J2/ofklPrnOgtopFecrX6vchZ9k= =J8Qs -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[flink-connector-cassandra] 01/02: [FLINK-31927] Disable cassandra driver metrics as they are not integrated with Flink metrics
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch v3.1 in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git commit 92ee0991a268e11c7db25c1aa040bbdebfc697f7 Author: Etienne Chauchot AuthorDate: Tue May 2 16:29:45 2023 +0200 [FLINK-31927] Disable cassandra driver metrics as they are not integrated with Flink metrics --- .../apache/flink/streaming/connectors/cassandra/ClusterBuilder.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java b/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java index fb537bc..9a98892 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java @@ -24,12 +24,13 @@ import java.io.Serializable; /** * This class is used to configure a {@link com.datastax.driver.core.Cluster} after deployment. The - * cluster represents the connection that will be established to Cassandra. + * cluster represents the connection that will be established to Cassandra. Cassandra driver metrics + * are not integrated with Flink metrics, so they are disabled. */ public abstract class ClusterBuilder implements Serializable { public Cluster getCluster() { -return buildCluster(Cluster.builder()); +return buildCluster(Cluster.builder().withoutMetrics()); } /**
[flink-connector-cassandra] 02/02: [FLINK-31927] Use default clusterBuilder reporting configuration in the tests
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch v3.1 in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git commit 3170f2a5c3b8e6b520dd4db55f8795a817266261 Author: Etienne Chauchot AuthorDate: Tue May 2 16:52:52 2023 +0200 [FLINK-31927] Use default clusterBuilder reporting configuration in the tests --- .../apache/flink/connector/cassandra/CassandraTestEnvironment.java | 2 -- .../flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java | 5 + 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java index 15b98d2..a0e649c 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java @@ -173,8 +173,6 @@ public class CassandraTestEnvironment implements TestResource { // default timeout x3 and higher than // request_timeout_in_ms at the cluster level .setReadTimeoutMillis(READ_TIMEOUT_MILLIS)) -.withoutJMXReporting() -.withoutMetrics() .build(); } }; diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java index 560ea2a..7b81e57 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java @@ -62,10 +62,7 @@ class CassandraSinkBaseTest { new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { -return builder.addContactPoint("127.0.0.1") -.withoutJMXReporting() -.withoutMetrics() -.build(); +return builder.addContactPoint("127.0.0.1").build(); } }, CassandraSinkBaseConfig.newBuilder().build(),
[flink-connector-cassandra] branch v3.1 updated (ffe30a0 -> 3170f2a)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a change to branch v3.1 in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git from ffe30a0 Update version to 3.1-SNAPSHOT new 92ee099 [FLINK-31927] Disable cassandra driver metrics as they are not integrated with Flink metrics new 3170f2a [FLINK-31927] Use default clusterBuilder reporting configuration in the tests The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/flink/streaming/connectors/cassandra/ClusterBuilder.java | 5 +++-- .../apache/flink/connector/cassandra/CassandraTestEnvironment.java | 2 -- .../flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java | 5 + 3 files changed, 4 insertions(+), 8 deletions(-)
[flink] branch master updated (00b1d4cf880 -> 43528be1b10)
This is an automated email from the ASF dual-hosted git repository. dmvk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 00b1d4cf880 [FLINK-31443][runtime] Maintain redundant task managers to speed up failover add 43528be1b10 [FLINK-32000][webui] Expose max parallelism in the vertex detail drawer. No new revisions were added by this update. Summary of changes: .../overview/detail/job-overview-drawer-detail.component.html | 11 +++ .../overview/detail/job-overview-drawer-detail.component.ts | 4 +++- 2 files changed, 14 insertions(+), 1 deletion(-)
svn commit: r61660 - /release/flink/flink-connector-jdbc-3.0.0/
Author: dannycranmer Date: Fri May 5 09:03:00 2023 New Revision: 61660 Log: Remove old release for Apache Flink Connector JDBC 3.0.0 Removed: release/flink/flink-connector-jdbc-3.0.0/
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new a73c47e2b Rebuild website a73c47e2b is described below commit a73c47e2bd27266e6ed571b89e239a1afb06575a Author: Danny Cranmer AuthorDate: Fri May 5 09:50:53 2023 +0100 Rebuild website --- content/downloads/index.html| 20 +--- content/zh/downloads/index.html | 20 +--- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/content/downloads/index.html b/content/downloads/index.html index 9aff01046..77a94f8c6 100644 --- a/content/downloads/index.html +++ b/content/downloads/index.html @@ -908,7 +908,7 @@ https://github.com/alex-shpak/hugo-book Apache Flink AWS Connectors 4.1.0 Apache Flink Cassandra Connector 3.0.0 Apache Flink Elasticsearch Connector 3.0.0 -Apache Flink JDBC Connector 3.0.0 +Apache Flink JDBC Connector 3.1.0 Apache Flink Kafka Connector 3.0.0 Apache Flink MongoDB Connector 1.0.1 Apache Flink Opensearch Connector 1.0.0 @@ -1087,14 +1087,19 @@ under the License. 1.16.x - - Apache Flink JDBC Connector 3.0.0 - # + + Apache Flink JDBC Connector 3.1.0 + # -https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz;>Apache Flink JDBC Connector 3.0.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.asc;>(asc, https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.sha512;>sha512) +https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;>Apache Flink JDBC Connector 3.1.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.asc;>(asc, https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.sha512;>sha512) This component is compatible with Apache Flink version(s): -1.16.x + +1.16.x + + +1.17.x + Apache Flink Kafka Connector 3.0.0 @@ -1520,6 +1525,7 @@ The statefun-flink-harness dependency includes a local execution en Flink AWS Connectors 4.1.0 - 2023-04-03 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-aws-4.1.0/flink-connector-aws-4.1.0-src.tgz;>Source) Flink Kafka Connector 3.0.0 - 2023-04-21 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.0.0/flink-connector-kafka-3.0.0-src.tgz;>Source) Flink MongoDB Connector 1.0.1 - 2023-04-24 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-mongodb-1.0.1/flink-connector-mongodb-1.0.1-src.tgz;>Source) +Flink JDBC Connector 3.1.0 - 2023-05-05 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;>Source) Apache Flink Stateful Functions @@ -1642,7 +1648,7 @@ The statefun-flink-harness dependency includes a local execution en Apache Flink AWS Connectors 4.1.0 Apache Flink Cassandra Connector 3.0.0 Apache Flink Elasticsearch Connector 3.0.0 -Apache Flink JDBC Connector 3.0.0 +Apache Flink JDBC Connector 3.1.0 Apache Flink Kafka Connector 3.0.0 Apache Flink MongoDB Connector 1.0.1 Apache Flink Opensearch Connector 1.0.0 diff --git a/content/zh/downloads/index.html b/content/zh/downloads/index.html index f75ab3adc..905cbc2df 100644 --- a/content/zh/downloads/index.html +++ b/content/zh/downloads/index.html @@ -908,7 +908,7 @@ https://github.com/alex-shpak/hugo-book Apache Flink AWS Connectors 4.1.0 Apache Flink Cassandra Connector 3.0.0 Apache Flink Elasticsearch Connector 3.0.0 -Apache Flink JDBC Connector 3.0.0 +Apache Flink JDBC Connector 3.1.0 Apache Flink Kafka Connector 3.0.0 Apache Flink MongoDB Connector 1.0.1 Apache Flink Opensearch Connector 1.0.0 @@ -1087,14 +1087,19 @@ under the License. 1.16.x - - Apache Flink JDBC Connector 3.0.0 - # + + Apache Flink JDBC Connector 3.1.0 + # -https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz;>Apache Flink JDBC Connector 3.0.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.asc;>(asc, https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.sha512;>sha512) +https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;>Apache Flink JDBC Connector 3.1.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.asc;>(asc,
[flink] branch master updated (8d430f51d77 -> 00b1d4cf880)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 8d430f51d77 [FLINK-31995][tests] Adds shutdown check to DirectExecutorService add 2cabb157a4f [hotfix] Keep all calculations and deriving in SlotManagerConfiguration#fromConfiguration add 92f8666ec1d [FLINK-31443][runtime] Only check idle timeout when resource allocator is supported. add 62d3a47a990 [FLINK-31443][runtime] let ResourceAllocationStrategy decide which pending/idle task managers should be release. add 8a2b2729b9f [FLINK-31443][runtime] Maintain pendingSlotAllocationRecords in PendingTaskManager add 00b1d4cf880 [FLINK-31443][runtime] Maintain redundant task managers to speed up failover No new revisions were added by this update. Summary of changes: .../generated/resource_manager_configuration.html | 2 +- .../configuration/ResourceManagerOptions.java | 9 +- .../ResourceManagerRuntimeServices.java| 4 +- .../DefaultResourceAllocationStrategy.java | 137 ++- .../slotmanager/FineGrainedSlotManager.java| 98 ++- .../slotmanager/FineGrainedTaskManagerTracker.java | 41 ++--- .../slotmanager/PendingTaskManager.java| 41 + .../slotmanager/ResourceAllocationStrategy.java| 13 ++ .../slotmanager/ResourceReleaseResult.java | 70 .../slotmanager/SlotManagerConfiguration.java | 11 +- .../slotmanager/TaskExecutorManager.java | 41 +++-- .../TaskManagerResourceInfoProvider.java | 11 -- .../apache/flink/runtime/util/ResourceCounter.java | 11 ++ .../slotmanager/DeclarativeSlotManagerBuilder.java | 3 + .../DefaultResourceAllocationStrategyTest.java | 193 - ...gerDefaultResourceAllocationStrategyITCase.java | 101 ++- .../slotmanager/FineGrainedSlotManagerTest.java| 112 ++-- .../FineGrainedTaskManagerTrackerTest.java | 46 +++-- .../SlotManagerConfigurationBuilder.java | 3 + .../TestingResourceAllocationStrategy.java | 30 +++- .../slotmanager/TestingTaskManagerInfo.java| 10 +- .../TestingTaskManagerResourceInfoProvider.java| 27 --- 22 files changed, 746 insertions(+), 268 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java
[flink-web] branch asf-site updated: Add JDBC v3.1.0 for Flink 1.16.x and Flink 1.17.x
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 7b3bf5301 Add JDBC v3.1.0 for Flink 1.16.x and Flink 1.17.x 7b3bf5301 is described below commit 7b3bf530181ee46dc206062b6f63576d42255649 Author: Danny Cranmer AuthorDate: Thu Apr 13 15:32:26 2023 +0100 Add JDBC v3.1.0 for Flink 1.16.x and Flink 1.17.x --- docs/data/flink_connectors.yml | 10 +- docs/data/release_archive.yml | 5 + 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/data/flink_connectors.yml b/docs/data/flink_connectors.yml index c210b3800..4c5eb7282 100644 --- a/docs/data/flink_connectors.yml +++ b/docs/data/flink_connectors.yml @@ -44,11 +44,11 @@ elasticsearch: compatibility: ["1.16.x"] jdbc: - name: "Apache Flink JDBC Connector 3.0.0" - source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz; - source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.asc; - source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.sha512; - compatibility: ["1.16.x"] + name: "Apache Flink JDBC Connector 3.1.0" + source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz; + source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.asc; + source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.sha512; + compatibility: ["1.16.x", "1.17.x"] kafka: name: "Apache Flink Kafka Connector 3.0.0" diff --git a/docs/data/release_archive.yml b/docs/data/release_archive.yml index beb63d9af..3998e3b71 100644 --- a/docs/data/release_archive.yml +++ b/docs/data/release_archive.yml @@ -448,6 +448,11 @@ release_archive: version: 1.0.1 release_date: 2023-04-24 filename: "mongodb" +- name: "Flink JDBC Connector" + connector: "jdbc" + version: 3.1.0 + release_date: 2023-05-05 + filename: "jdbc" flink_shaded: -
[flink-connector-jdbc] annotated tag v3.1.0 updated (f6f1820 -> e56a23f)
This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a change to annotated tag v3.1.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git *** WARNING: tag v3.1.0 was modified! *** from f6f1820 (commit) to e56a23f (tag) tagging f6f1820de2d11febc59539e1b1339f7350532312 (commit) by Danny Cranmer on Fri May 5 09:21:42 2023 +0100 - Log - vv3.1.0 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiEED3nyr7I1G8KWeFRFkfnB7BJf2NsFAmRUvJgACgkQkfnB7BJf 2Nsjxw//eNpdnKwgMMI0Ia1axsfeZeI73fRV/76Xb4+89gRhl2Ty3fd7LmF+ulDx n5AWUCVUeus4xvV5oJqvJ101iQsjhEbSv76W4mH3pFWSYwONd4divYXqg9bzhE0/ saOk6w5/u9mIEnDVM/3ShAVZkvQW7+IZY6u5ZjZFavmC6YUtNPRk9XDpkhJP5cmU bUQRPT1YyFcMpaBL1qLI6r0G2wYlxhTc00OwjQvH62hBe/DUmJbe6la/7Ri+c/DP 68ZDT2gGUpZ46dxdR1fpsHCqMDM6KwY5GNx1r9N8svoo/9AWF5iYk51jpQKDzYfR 2qEV6Ca9ZmTGHg73VNNCKIZEyCOCvFtSbb3Evf4dPNAMq/Ng9VflJ1iT3u43nzF5 0GzlOlue+kst4746TbNBHb9o15oZfALT1vdVpTrbFMuwWEC/4NvQAPIpAFJPxFRR S/VzyXGWGh1lL5z5zO9rGy1lpmEjnq51LkgiGpcqf9P93CP/KWktic1NXlpBjVx5 WAHuOnYjS0sVd52lKWVaZc7Dx52ozz0chx7G7O5liJeI/9/uWn3vnD8DrpPtXVCO oXqIfCoWZv+EMVaHBgzgmhPHuUtF4Z5t6poFuVquHanx9RGr6l9vvSciBeqyworT h8b2B69x5sj8/4GrPJu25noX8O9pB8Vu0+Tj6WCrMDcoR33K0IE= =3Fze -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r61657 - /dev/flink/flink-connector-jdbc-3.1.0-rc1/ /release/flink/flink-connector-jdbc-3.1.0/
Author: dannycranmer Date: Fri May 5 08:21:15 2023 New Revision: 61657 Log: Release flink-connector-jdbc 3.1.0 Added: release/flink/flink-connector-jdbc-3.1.0/ - copied from r61656, dev/flink/flink-connector-jdbc-3.1.0-rc1/ Removed: dev/flink/flink-connector-jdbc-3.1.0-rc1/
[flink] 01/03: [FLINK-18808][streaming] Introduce helper builder class for StreamConfigChainer.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git commit 49a320688c46e9f1992b1486e1dff08be876e6ac Author: Weijie Guo AuthorDate: Fri May 5 15:06:54 2023 +0800 [FLINK-18808][streaming] Introduce helper builder class for StreamConfigChainer. --- .../runtime/tasks/StreamConfigChainer.java | 242 +++-- 1 file changed, 218 insertions(+), 24 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index 100340a6ac5..7411a3e2fd9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.ManagedMemoryUseCase; @@ -36,13 +37,17 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.util.OutputTag; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -58,6 +63,10 @@ public class StreamConfigChainer { private StreamConfig tailConfig; private int chainIndex = MAIN_NODE_ID; +private final List> outEdgesInOrder = new LinkedList<>(); + +private boolean setTailNonChainedOutputs = true; + StreamConfigChainer( OperatorID headOperatorID, StreamConfig headConfig, @@ -77,6 +86,11 @@ public class StreamConfigChainer { headConfig.setChainIndex(chainIndex); } +/** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ +@Deprecated public StreamConfigChainer chain( OperatorID operatorID, OneInputStreamOperator operator, @@ -85,11 +99,21 @@ public class StreamConfigChainer { return chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend); } +/** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ +@Deprecated public StreamConfigChainer chain( OneInputStreamOperator operator, TypeSerializer typeSerializer) { return chain(new OperatorID(), operator, typeSerializer); } +/** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ +@Deprecated public StreamConfigChainer chain( OperatorID operatorID, OneInputStreamOperator operator, @@ -97,11 +121,21 @@ public class StreamConfigChainer { return chain(operatorID, operator, typeSerializer, typeSerializer, false); } +/** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ +@Deprecated public StreamConfigChainer chain( OneInputStreamOperatorFactory operatorFactory, TypeSerializer typeSerializer) { return chain(new OperatorID(), operatorFactory, typeSerializer); } +/** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ +@Deprecated public StreamConfigChainer chain( OperatorID operatorID, OneInputStreamOperatorFactory operatorFactory, @@ -109,6 +143,11 @@ public class StreamConfigChainer { return chain(operatorID, operatorFactory, typeSerializer, typeSerializer, false); } +/** + * @deprecated Use {@link #chain(TypeSerializer)} or {@link #chain(TypeSerializer, + * TypeSerializer)} instead. + */ +@Deprecated private StreamConfigChainer chain( OperatorID operatorID, OneInputStreamOperator operator, @@ -123,6 +162,11 @@ public class StreamConfigChainer { createKeyedStateBackend); }
[flink] branch FLINK-18808-record-out created (now 9be2171a8fb)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git at 9be2171a8fb [FLINK-18808][streaming] Include side outputs in numRecordsOut metric This branch includes the following new commits: new 49a320688c4 [FLINK-18808][streaming] Introduce helper builder class for StreamConfigChainer. new d5aa8a5ea2a [FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck. new 9be2171a8fb [FLINK-18808][streaming] Include side outputs in numRecordsOut metric The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[flink] 02/03: [FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git commit d5aa8a5ea2a9920e6368089cd8f82f0344ace6ef Author: Weijie Guo AuthorDate: Wed Apr 26 18:03:07 2023 +0800 [FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck. --- .../streaming/runtime/io/RecordWriterOutput.java | 30 +--- .../streaming/runtime/tasks/ChainingOutput.java| 17 - .../runtime/tasks/OutputWithChainingCheck.java | 42 ++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 93acf23d30b..46fc01231be 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OutputWithChainingCheck; import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.OutputTag; @@ -43,7 +44,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Implementation of {@link Output} that sends data using a {@link RecordWriter}. */ @Internal -public class RecordWriterOutput implements WatermarkGaugeExposingOutput> { +public class RecordWriterOutput +implements WatermarkGaugeExposingOutput>, +OutputWithChainingCheck> { private RecordWriter> recordWriter; @@ -83,19 +86,36 @@ public class RecordWriterOutput implements WatermarkGaugeExposingOutput record) { +collectAndCheckIfCountNeeded(record); +} + +@Override +public void collect(OutputTag outputTag, StreamRecord record) { +collectAndCheckIfCountNeeded(outputTag, record); +} + +@Override +public boolean collectAndCheckIfCountNeeded(StreamRecord record) { if (this.outputTag != null) { // we are not responsible for emitting to the main output. -return; +return false; } pushToRecordWriter(record); +return true; } @Override -public void collect(OutputTag outputTag, StreamRecord record) { -if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) { -pushToRecordWriter(record); +public boolean collectAndCheckIfCountNeeded( +OutputTag outputTag, StreamRecord record) { +if (!OutputTag.isResponsibleFor(this.outputTag, outputTag)) { +// we are not responsible for emitting to the side-output specified by this +// OutputTag. +return false; } + +pushToRecordWriter(record); +return true; } private void pushToRecordWriter(StreamRecord record) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java index 0ff3785056f..5b2cf742442 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java @@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -class ChainingOutput implements WatermarkGaugeExposingOutput> { +class ChainingOutput +implements WatermarkGaugeExposingOutput>, +OutputWithChainingCheck> { private static final Logger LOG = LoggerFactory.getLogger(ChainingOutput.class); protected final Input input; @@ -82,6 +84,19 @@ class ChainingOutput implements WatermarkGaugeExposingOutput> } } +@Override +public boolean collectAndCheckIfCountNeeded(StreamRecord record) { +collect(record); +return false; +} + +@Override +public boolean collectAndCheckIfCountNeeded( +OutputTag outputTag, StreamRecord record) { +collect(outputTag, record); +return false; +} + protected void pushToOperator(StreamRecord record) { try { // we know that the given outputTag matches our OutputTag so the record diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java
[flink] 03/03: [FLINK-18808][streaming] Include side outputs in numRecordsOut metric
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch FLINK-18808-record-out in repository https://gitbox.apache.org/repos/asf/flink.git commit 9be2171a8fbeac72e34def779e1003bf099ab802 Author: Weijie Guo AuthorDate: Wed Apr 26 19:39:51 2023 +0800 [FLINK-18808][streaming] Include side outputs in numRecordsOut metric Co-authored-by: 李明 --- .../streaming/runtime/io/RecordWriterOutput.java | 18 ++- .../runtime/tasks/BroadcastingOutputCollector.java | 24 +++- .../tasks/CopyingBroadcastingOutputCollector.java | 41 --- .../streaming/runtime/tasks/OperatorChain.java | 39 -- .../runtime/tasks/MultipleInputStreamTaskTest.java | 131 + .../runtime/tasks/OneInputStreamTaskTest.java | 88 ++ .../streaming/runtime/tasks/OperatorChainTest.java | 5 +- .../runtime/tasks/TwoInputStreamTaskTest.java | 103 8 files changed, 415 insertions(+), 34 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 46fc01231be..6d1ae671153 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -60,6 +62,10 @@ public class RecordWriterOutput private WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE; +// Uses a dummy counter here to avoid checking the existence of numRecordsOut on the +// per-record path. +private Counter numRecordsOut = new SimpleCounter(); + @SuppressWarnings("unchecked") public RecordWriterOutput( RecordWriter>> recordWriter, @@ -86,12 +92,16 @@ public class RecordWriterOutput @Override public void collect(StreamRecord record) { -collectAndCheckIfCountNeeded(record); +if (collectAndCheckIfCountNeeded(record)) { +numRecordsOut.inc(); +} } @Override public void collect(OutputTag outputTag, StreamRecord record) { -collectAndCheckIfCountNeeded(outputTag, record); +if (collectAndCheckIfCountNeeded(outputTag, record)) { +numRecordsOut.inc(); +} } @Override @@ -168,6 +178,10 @@ public class RecordWriterOutput } } +public void setNumRecordsOut(Counter numRecordsOut) { +this.numRecordsOut = checkNotNull(numRecordsOut); +} + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { if (isPriorityEvent && event instanceof CheckpointBarrier diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java index 344215f0e51..2ddc8781eb7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; @@ -31,12 +32,15 @@ import java.util.Random; class BroadcastingOutputCollector implements WatermarkGaugeExposingOutput> { -protected final Output>[] outputs; +protected final OutputWithChainingCheck>[] outputs; private final Random random = new XORShiftRandom(); private final WatermarkGauge watermarkGauge = new WatermarkGauge(); +protected final Counter numRecordsOutForTask; -public BroadcastingOutputCollector(Output>[] outputs) { +public BroadcastingOutputCollector( +OutputWithChainingCheck>[] outputs, Counter numRecordsOutForTask) { this.outputs = outputs; +this.numRecordsOutForTask = numRecordsOutForTask; } @Override @@ -73,15 +77,23 @@ class BroadcastingOutputCollector implements WatermarkGaugeExposingOutput record) { -for (Output> output : outputs) { -output.collect(record); +boolean emitted =
[flink] branch release-1.17 updated: [FLINK-31834][Azure] Free up disk space before caching
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 91dfb22e0bc [FLINK-31834][Azure] Free up disk space before caching 91dfb22e0bc is described below commit 91dfb22e0bc7ac10a9a9f59cd9da6d62a723dadd Author: Robert Metzger AuthorDate: Tue Apr 18 16:45:46 2023 +0200 [FLINK-31834][Azure] Free up disk space before caching --- tools/azure-pipelines/e2e-template.yml | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/azure-pipelines/e2e-template.yml b/tools/azure-pipelines/e2e-template.yml index d9e90045947..c0cb74674d0 100644 --- a/tools/azure-pipelines/e2e-template.yml +++ b/tools/azure-pipelines/e2e-template.yml @@ -44,6 +44,10 @@ jobs: echo "##vso[task.setvariable variable=skip;]0" fi displayName: Check if Docs only PR +# free up disk space before running anything caching related. Caching has proven to fail in the past, due to lacking disk space. +- script: ./tools/azure-pipelines/free_disk_space.sh + target: host + displayName: Free up disk space # the cache task does not create directories on a cache miss, and can later fail when trying to tar the directory if the test haven't created it # this may for example happen if a given directory is only used by a subset of tests, which are run in a different 'group' - bash: | @@ -96,8 +100,6 @@ jobs: source ./tools/ci/maven-utils.sh setup_maven -echo "Free up disk space" -./tools/azure-pipelines/free_disk_space.sh # the APT mirrors access is based on a proposal from https://github.com/actions/runner-images/issues/7048#issuecomment-1419426054 echo "Configure APT mirrors"