[flink-connector-pulsar] 01/01: [FLINK-32003] Upgrade pulsar-client version to work with OAuth2

2023-05-05 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch FLINK-32003
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 59303cbb9b0e5602d05e47c1825bf5f04baec4b2
Author: tison 
AuthorDate: Sat May 6 13:53:41 2023 +0800

[FLINK-32003] Upgrade pulsar-client version to work with OAuth2
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2fa2d4c..9730c3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@ under the License.
 
 
 1.16.0
-2.10.1
+2.10.2
 
 2.13.4.20221013
 1.45.1
@@ -689,4 +689,4 @@ under the License.
 
 
 
-
\ No newline at end of file
+



[flink-connector-pulsar] branch FLINK-32003 created (now 59303cb)

2023-05-05 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch FLINK-32003
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


  at 59303cb  [FLINK-32003] Upgrade pulsar-client version to work with 
OAuth2

This branch includes the following new commits:

 new 59303cb  [FLINK-32003] Upgrade pulsar-client version to work with 
OAuth2

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink] 03/03: [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3379c2c085da43bb452536c981a7fc13f39482ee
Author: Weijie Guo 
AuthorDate: Fri Apr 21 17:35:23 2023 +0800

[FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments 
in LocalBufferPool.
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 72 ++
 .../io/network/buffer/LocalBufferPoolTest.java | 21 +--
 2 files changed, 35 insertions(+), 58 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 190734c35b4..6d6d236f902 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -49,11 +49,11 @@ import static 
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
  *
  * The size of this pool can be dynamically changed at runtime ({@link 
#setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link 
NetworkBufferPool} to match
- * its new size. New buffers can be requested only when {@code 
numberOfRequestedMemorySegments +
- * numberOfRequestedOverdraftMemorySegments < currentPoolSize + 
maxOverdraftBuffersPerGate}. In
- * order to meet this requirement, when the size of this pool changes,
- * numberOfRequestedMemorySegments and 
numberOfRequestedOverdraftMemorySegments can be converted to
- * each other.
+ * its new size.
+ *
+ * New buffers can be requested only when {@code 
numberOfRequestedMemorySegments <
+ * currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers 
exceeding the
+ * currentPoolSize will be dynamically regarded as overdraft buffers.
  *
  * Availability is defined as returning a non-overdraft segment on a 
subsequent {@link
  * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a 
non-blocking {@link
@@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool {
 
 private int maxOverdraftBuffersPerGate;
 
-@GuardedBy("availableMemorySegments")
-private int numberOfRequestedOverdraftMemorySegments;
-
 @GuardedBy("availableMemorySegments")
 private boolean isDestroyed;
 
@@ -306,13 +303,6 @@ class LocalBufferPool implements BufferPool {
 }
 }
 
-@VisibleForTesting
-public int getNumberOfRequestedOverdraftMemorySegments() {
-synchronized (availableMemorySegments) {
-return numberOfRequestedOverdraftMemorySegments;
-}
-}
-
 @Override
 public int getNumberOfAvailableMemorySegments() {
 synchronized (availableMemorySegments) {
@@ -331,11 +321,7 @@ class LocalBufferPool implements BufferPool {
 @SuppressWarnings("FieldAccessNotGuarded")
 @Override
 public int bestEffortGetNumOfUsedBuffers() {
-return Math.max(
-0,
-numberOfRequestedMemorySegments
-+ numberOfRequestedOverdraftMemorySegments
-- availableMemorySegments.size());
+return Math.max(0, numberOfRequestedMemorySegments - 
availableMemorySegments.size());
 }
 
 @Override
@@ -452,14 +438,9 @@ class LocalBufferPool implements BufferPool {
 return false;
 }
 
-checkState(
-!isDestroyed,
-"Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
-
-MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
+MemorySegment segment = requestPooledMemorySegment();
 if (segment != null) {
 availableMemorySegments.add(segment);
-numberOfRequestedMemorySegments++;
 return true;
 }
 return false;
@@ -469,17 +450,25 @@ class LocalBufferPool implements BufferPool {
 private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
 assert Thread.holdsLock(availableMemorySegments);
 
-if (numberOfRequestedOverdraftMemorySegments >= 
maxOverdraftBuffersPerGate) {
+// if overdraft buffers(i.e. buffers exceeding poolSize) is greater 
than or equal to
+// maxOverdraftBuffersPerGate, no new buffer can be requested.
+if (numberOfRequestedMemorySegments - currentPoolSize >= 
maxOverdraftBuffersPerGate) {
 return null;
 }
 
+return requestPooledMemorySegment();
+}
+
+@Nullable
+@GuardedBy("availableMemorySegments")
+private MemorySegment requestPooledMemorySegment() {
 checkState(
 !isDestroyed,
 "Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
 
 MemorySegment segment = 

[flink] 01/03: [hotfix] Fix the link to a private field in other class.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit babefacdde5b1f4f90e74306d9cd4ccfc53cb1a7
Author: Weijie Guo 
AuthorDate: Fri Apr 21 17:40:58 2023 +0800

[hotfix] Fix the link to a private field in other class.

This illegal link will make IDE not happy.
---
 .../org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 6506ab9f942..e012c725582 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -85,7 +85,7 @@ class LocalBufferPool implements BufferPool {
  *
  * BEWARE: Take special care with the interactions 
between this lock and
  * locks acquired before entering this class vs. locks being acquired 
during calls to external
- * code inside this class, e.g. with {@link
+ * code inside this class, e.g. with {@code
  * 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager#bufferQueue}
 via the
  * {@link #registeredListeners} callback.
  */



[flink] 02/03: [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f80d7a4428e26b319172a56e02976bcbf5707a4f
Author: Weijie Guo 
AuthorDate: Fri Apr 21 17:46:02 2023 +0800

[FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and 
rename the old one to a more appropriate name.
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 11 +-
 .../io/network/buffer/NetworkBufferPool.java   | 18 
 .../network/metrics/NettyShuffleMetricFactory.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java |  5 +
 .../io/network/buffer/NetworkBufferPoolTest.java   | 24 +++---
 5 files changed, 34 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e012c725582..190734c35b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool {
 }
 
 /**
+ * Estimates the number of requested buffers.
+ *
  * @return the same value as {@link #getMaxNumberOfMemorySegments()} for 
bounded pools. For
  * unbounded pools it returns an approximation based upon {@link
  * #getNumberOfRequiredMemorySegments()}
  */
-public int getNumberOfRequestedMemorySegments() {
+public int getEstimatedNumberOfRequestedMemorySegments() {
 if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) 
{
 return maxNumberOfMemorySegments;
 } else {
@@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool {
 }
 }
 
+@VisibleForTesting
+public int getNumberOfRequestedMemorySegments() {
+synchronized (availableMemorySegments) {
+return numberOfRequestedMemorySegments;
+}
+}
+
 @VisibleForTesting
 public int getNumberOfRequestedOverdraftMemorySegments() {
 synchronized (availableMemorySegments) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 24ca78fd551..c14f89fe096 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -366,36 +366,38 @@ public class NetworkBufferPool
 }
 }
 
-public long getNumberOfRequestedMemorySegments() {
+public long getEstimatedNumberOfRequestedMemorySegments() {
 long requestedSegments = 0;
 synchronized (factoryLock) {
 for (LocalBufferPool bufferPool : allBufferPools) {
-requestedSegments += 
bufferPool.getNumberOfRequestedMemorySegments();
+requestedSegments += 
bufferPool.getEstimatedNumberOfRequestedMemorySegments();
 }
 }
 return requestedSegments;
 }
 
-public long getRequestedMemory() {
-return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+public long getEstimatedRequestedMemory() {
+return getEstimatedNumberOfRequestedMemorySegments() * 
memorySegmentSize;
 }
 
-public int getRequestedSegmentsUsage() {
+public int getEstimatedRequestedSegmentsUsage() {
 int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments();
 return totalNumberOfMemorySegments == 0
 ? 0
 : Math.toIntExact(
-100L * getNumberOfRequestedMemorySegments() / 
totalNumberOfMemorySegments);
+100L
+* getEstimatedNumberOfRequestedMemorySegments()
+/ totalNumberOfMemorySegments);
 }
 
 @VisibleForTesting
 Optional getUsageWarning() {
-int currentUsage = getRequestedSegmentsUsage();
+int currentUsage = getEstimatedRequestedSegmentsUsage();
 Optional message = Optional.empty();
 // do not log warning if the value hasn't changed to avoid spamming 
warnings.
 if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != 
currentUsage) {
 long totalMemory = getTotalMemory();
-long requestedMemory = getRequestedMemory();
+long requestedMemory = getEstimatedRequestedMemory();
 long missingMemory = requestedMemory - totalMemory;
 message =
 Optional.of(
diff --git 

[flink] branch master updated (026d7ccfe1d -> 3379c2c085d)

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 026d7ccfe1d [FLINK-32010][kubernetes] Properly handle 
KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in 
case the leader is already known.
 new babefacdde5 [hotfix] Fix the link to a private field in other class.
 new f80d7a4428e [FLINK-31764][runtime] Introduce 
getNumberOfRequestedMemorySegments and rename the old one to a more appropriate 
name.
 new 3379c2c085d [FLINK-31764][runtime] Get rid of 
numberOfRequestedOverdraftMemorySegments in LocalBufferPool.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/network/buffer/LocalBufferPool.java | 75 --
 .../io/network/buffer/NetworkBufferPool.java   | 18 +++---
 .../network/metrics/NettyShuffleMetricFactory.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java | 24 ---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 24 +++
 5 files changed, 64 insertions(+), 79 deletions(-)



[flink] 02/03: [FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and rename the old one to a more appropriate name.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c838d68a9db30c2cba021c1164a0cd464050da5a
Author: Weijie Guo 
AuthorDate: Fri Apr 21 17:46:02 2023 +0800

[FLINK-31764][runtime] Introduce getNumberOfRequestedMemorySegments and 
rename the old one to a more appropriate name.
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 11 +-
 .../io/network/buffer/NetworkBufferPool.java   | 18 
 .../network/metrics/NettyShuffleMetricFactory.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java |  5 +
 .../io/network/buffer/NetworkBufferPoolTest.java   | 24 +++---
 5 files changed, 34 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e012c725582..190734c35b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -285,11 +285,13 @@ class LocalBufferPool implements BufferPool {
 }
 
 /**
+ * Estimates the number of requested buffers.
+ *
  * @return the same value as {@link #getMaxNumberOfMemorySegments()} for 
bounded pools. For
  * unbounded pools it returns an approximation based upon {@link
  * #getNumberOfRequiredMemorySegments()}
  */
-public int getNumberOfRequestedMemorySegments() {
+public int getEstimatedNumberOfRequestedMemorySegments() {
 if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) 
{
 return maxNumberOfMemorySegments;
 } else {
@@ -297,6 +299,13 @@ class LocalBufferPool implements BufferPool {
 }
 }
 
+@VisibleForTesting
+public int getNumberOfRequestedMemorySegments() {
+synchronized (availableMemorySegments) {
+return numberOfRequestedMemorySegments;
+}
+}
+
 @VisibleForTesting
 public int getNumberOfRequestedOverdraftMemorySegments() {
 synchronized (availableMemorySegments) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 24ca78fd551..c14f89fe096 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -366,36 +366,38 @@ public class NetworkBufferPool
 }
 }
 
-public long getNumberOfRequestedMemorySegments() {
+public long getEstimatedNumberOfRequestedMemorySegments() {
 long requestedSegments = 0;
 synchronized (factoryLock) {
 for (LocalBufferPool bufferPool : allBufferPools) {
-requestedSegments += 
bufferPool.getNumberOfRequestedMemorySegments();
+requestedSegments += 
bufferPool.getEstimatedNumberOfRequestedMemorySegments();
 }
 }
 return requestedSegments;
 }
 
-public long getRequestedMemory() {
-return getNumberOfRequestedMemorySegments() * memorySegmentSize;
+public long getEstimatedRequestedMemory() {
+return getEstimatedNumberOfRequestedMemorySegments() * 
memorySegmentSize;
 }
 
-public int getRequestedSegmentsUsage() {
+public int getEstimatedRequestedSegmentsUsage() {
 int totalNumberOfMemorySegments = getTotalNumberOfMemorySegments();
 return totalNumberOfMemorySegments == 0
 ? 0
 : Math.toIntExact(
-100L * getNumberOfRequestedMemorySegments() / 
totalNumberOfMemorySegments);
+100L
+* getEstimatedNumberOfRequestedMemorySegments()
+/ totalNumberOfMemorySegments);
 }
 
 @VisibleForTesting
 Optional getUsageWarning() {
-int currentUsage = getRequestedSegmentsUsage();
+int currentUsage = getEstimatedRequestedSegmentsUsage();
 Optional message = Optional.empty();
 // do not log warning if the value hasn't changed to avoid spamming 
warnings.
 if (currentUsage >= USAGE_WARNING_THRESHOLD && lastCheckedUsage != 
currentUsage) {
 long totalMemory = getTotalMemory();
-long requestedMemory = getRequestedMemory();
+long requestedMemory = getEstimatedRequestedMemory();
 long missingMemory = requestedMemory - totalMemory;
 message =
 Optional.of(
diff --git 

[flink] branch release-1.17 updated (91dfb22e0bc -> 2edc003004d)

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


from 91dfb22e0bc [FLINK-31834][Azure] Free up disk space before caching
 new 703a3f11ee6 [hotfix] Fix the link to a private field in other class.
 new c838d68a9db [FLINK-31764][runtime] Introduce 
getNumberOfRequestedMemorySegments and rename the old one to a more appropriate 
name.
 new 2edc003004d [FLINK-31764][runtime] Get rid of 
numberOfRequestedOverdraftMemorySegments in LocalBufferPool.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/network/buffer/LocalBufferPool.java | 75 --
 .../io/network/buffer/NetworkBufferPool.java   | 18 +++---
 .../network/metrics/NettyShuffleMetricFactory.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java | 24 ---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 24 +++
 5 files changed, 64 insertions(+), 79 deletions(-)



[flink] 03/03: [FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments in LocalBufferPool.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2edc003004d0d0f1e40ef2b7e14189965183f46c
Author: Weijie Guo 
AuthorDate: Fri Apr 21 17:35:23 2023 +0800

[FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySegments 
in LocalBufferPool.
---
 .../runtime/io/network/buffer/LocalBufferPool.java | 72 ++
 .../io/network/buffer/LocalBufferPoolTest.java | 21 +--
 2 files changed, 35 insertions(+), 58 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 190734c35b4..6d6d236f902 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -49,11 +49,11 @@ import static 
org.apache.flink.util.concurrent.FutureUtils.assertNoException;
  *
  * The size of this pool can be dynamically changed at runtime ({@link 
#setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link 
NetworkBufferPool} to match
- * its new size. New buffers can be requested only when {@code 
numberOfRequestedMemorySegments +
- * numberOfRequestedOverdraftMemorySegments < currentPoolSize + 
maxOverdraftBuffersPerGate}. In
- * order to meet this requirement, when the size of this pool changes,
- * numberOfRequestedMemorySegments and 
numberOfRequestedOverdraftMemorySegments can be converted to
- * each other.
+ * its new size.
+ *
+ * New buffers can be requested only when {@code 
numberOfRequestedMemorySegments <
+ * currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers 
exceeding the
+ * currentPoolSize will be dynamically regarded as overdraft buffers.
  *
  * Availability is defined as returning a non-overdraft segment on a 
subsequent {@link
  * #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a 
non-blocking {@link
@@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool {
 
 private int maxOverdraftBuffersPerGate;
 
-@GuardedBy("availableMemorySegments")
-private int numberOfRequestedOverdraftMemorySegments;
-
 @GuardedBy("availableMemorySegments")
 private boolean isDestroyed;
 
@@ -306,13 +303,6 @@ class LocalBufferPool implements BufferPool {
 }
 }
 
-@VisibleForTesting
-public int getNumberOfRequestedOverdraftMemorySegments() {
-synchronized (availableMemorySegments) {
-return numberOfRequestedOverdraftMemorySegments;
-}
-}
-
 @Override
 public int getNumberOfAvailableMemorySegments() {
 synchronized (availableMemorySegments) {
@@ -331,11 +321,7 @@ class LocalBufferPool implements BufferPool {
 @SuppressWarnings("FieldAccessNotGuarded")
 @Override
 public int bestEffortGetNumOfUsedBuffers() {
-return Math.max(
-0,
-numberOfRequestedMemorySegments
-+ numberOfRequestedOverdraftMemorySegments
-- availableMemorySegments.size());
+return Math.max(0, numberOfRequestedMemorySegments - 
availableMemorySegments.size());
 }
 
 @Override
@@ -452,14 +438,9 @@ class LocalBufferPool implements BufferPool {
 return false;
 }
 
-checkState(
-!isDestroyed,
-"Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
-
-MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
+MemorySegment segment = requestPooledMemorySegment();
 if (segment != null) {
 availableMemorySegments.add(segment);
-numberOfRequestedMemorySegments++;
 return true;
 }
 return false;
@@ -469,17 +450,25 @@ class LocalBufferPool implements BufferPool {
 private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
 assert Thread.holdsLock(availableMemorySegments);
 
-if (numberOfRequestedOverdraftMemorySegments >= 
maxOverdraftBuffersPerGate) {
+// if overdraft buffers(i.e. buffers exceeding poolSize) is greater 
than or equal to
+// maxOverdraftBuffersPerGate, no new buffer can be requested.
+if (numberOfRequestedMemorySegments - currentPoolSize >= 
maxOverdraftBuffersPerGate) {
 return null;
 }
 
+return requestPooledMemorySegment();
+}
+
+@Nullable
+@GuardedBy("availableMemorySegments")
+private MemorySegment requestPooledMemorySegment() {
 checkState(
 !isDestroyed,
 "Destroyed buffer pools should never acquire segments - this 
will lead to buffer leaks.");
 
 MemorySegment segment = 

[flink] 01/03: [hotfix] Fix the link to a private field in other class.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 703a3f11ee651fbbf6c4c4abc01d82ee184890d6
Author: Weijie Guo 
AuthorDate: Fri Apr 21 17:40:58 2023 +0800

[hotfix] Fix the link to a private field in other class.

This illegal link will make IDE not happy.
---
 .../org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 6506ab9f942..e012c725582 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -85,7 +85,7 @@ class LocalBufferPool implements BufferPool {
  *
  * BEWARE: Take special care with the interactions 
between this lock and
  * locks acquired before entering this class vs. locks being acquired 
during calls to external
- * code inside this class, e.g. with {@link
+ * code inside this class, e.g. with {@code
  * 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager#bufferQueue}
 via the
  * {@link #registeredListeners} callback.
  */



[flink] 01/04: [FLINK-32010][kubernetes] Remove an unused parameter from KubernetesLeaderRetrievalDriver.

2023-05-05 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 058b1f5045d607a1985ee063618e3819b14fb75d
Author: David Moravek 
AuthorDate: Fri May 5 10:09:49 2023 +0200

[FLINK-32010][kubernetes] Remove an unused parameter from 
KubernetesLeaderRetrievalDriver.
---
 .../highavailability/KubernetesLeaderRetrievalDriver.java   | 4 
 .../highavailability/KubernetesLeaderRetrievalDriverFactory.java| 6 --
 .../KubernetesMultipleComponentLeaderRetrievalDriverFactory.java| 1 -
 .../highavailability/KubernetesHighAvailabilityTestBase.java| 1 -
 .../KubernetesLeaderElectionAndRetrievalITCase.java | 1 -
 5 files changed, 13 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
index fb3516a30ac..33e88ba903f 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
@@ -49,8 +49,6 @@ public class KubernetesLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
 private static final Logger LOG =
 LoggerFactory.getLogger(KubernetesLeaderRetrievalDriver.class);
 
-private final FlinkKubeClient kubeClient;
-
 private final String configMapName;
 
 private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
@@ -64,14 +62,12 @@ public class KubernetesLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
 private final Function 
leaderInformationExtractor;
 
 public KubernetesLeaderRetrievalDriver(
-FlinkKubeClient kubeClient,
 KubernetesConfigMapSharedWatcher configMapSharedWatcher,
 Executor watchExecutor,
 String configMapName,
 LeaderRetrievalEventHandler leaderRetrievalEventHandler,
 Function 
leaderInformationExtractor,
 FatalErrorHandler fatalErrorHandler) {
-this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
 this.configMapName = checkNotNull(configMapName, "ConfigMap name");
 this.leaderRetrievalEventHandler =
 checkNotNull(leaderRetrievalEventHandler, 
"LeaderRetrievalEventHandler");
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java
index cba8d65e9f0..408f0f6f15f 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.kubernetes.highavailability;
 
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriverFactory;
@@ -35,19 +34,15 @@ import java.util.concurrent.Executor;
 @Deprecated
 public class KubernetesLeaderRetrievalDriverFactory implements 
LeaderRetrievalDriverFactory {
 
-private final FlinkKubeClient kubeClient;
-
 private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
 private final Executor watchExecutor;
 
 private final String configMapName;
 
 public KubernetesLeaderRetrievalDriverFactory(
-FlinkKubeClient kubeClient,
 KubernetesConfigMapSharedWatcher configMapSharedWatcher,
 Executor watchExecutor,
 String configMapName) {
-this.kubeClient = kubeClient;
 this.configMapSharedWatcher = configMapSharedWatcher;
 this.watchExecutor = watchExecutor;
 this.configMapName = configMapName;
@@ -57,7 +52,6 @@ public class KubernetesLeaderRetrievalDriverFactory 
implements LeaderRetrievalDr
 public KubernetesLeaderRetrievalDriver createLeaderRetrievalDriver(
 LeaderRetrievalEventHandler leaderEventHandler, FatalErrorHandler 
fatalErrorHandler) {
 return new KubernetesLeaderRetrievalDriver(
-kubeClient,
 configMapSharedWatcher,
 watchExecutor,
 configMapName,
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderRetrievalDriverFactory.java
 

[flink] branch master updated (2ff4e220945 -> 026d7ccfe1d)

2023-05-05 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for 
jdbc driver
 new 058b1f5045d [FLINK-32010][kubernetes] Remove an unused parameter from 
KubernetesLeaderRetrievalDriver.
 new 075f359fdbb [FLINK-32010][kubernetes] Rename 
KubernetesUtils#checkConfigMaps to KubernetesUtils#getOnlyConfigMap to provide 
more clarity.
 new 6009e811ecf [FLINK-32010][runtime] Leader election/retrieval drivers 
should properly extend AutoCloseable instead of having their own close method 
with the same signature.
 new 026d7ccfe1d [FLINK-32010][kubernetes] Properly handle 
KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in 
case the leader is already known.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../KubernetesLeaderElectionDriver.java|   8 +-
 .../KubernetesLeaderRetrievalDriver.java   |  19 ++--
 .../KubernetesLeaderRetrievalDriverFactory.java|   6 --
 ...netesMultipleComponentLeaderElectionDriver.java |   8 +-
 ...tipleComponentLeaderRetrievalDriverFactory.java |   1 -
 .../flink/kubernetes/utils/KubernetesUtils.java|  12 ++-
 .../KubernetesHighAvailabilityTestBase.java|   1 -
 ...KubernetesLeaderElectionAndRetrievalITCase.java | 103 +
 .../leaderelection/LeaderElectionDriver.java   |   5 +-
 .../leaderretrieval/LeaderRetrievalDriver.java |   6 +-
 .../TestingLeaderElectionEventHandler.java |   3 +-
 11 files changed, 96 insertions(+), 76 deletions(-)



[flink] 03/04: [FLINK-32010][runtime] Leader election/retrieval drivers should properly extend AutoCloseable instead of having their own close method with the same signature.

2023-05-05 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6009e811ecfee8d7378016b71ebd6bf11e364427
Author: David Moravek 
AuthorDate: Fri May 5 12:31:09 2023 +0200

[FLINK-32010][runtime] Leader election/retrieval drivers should properly 
extend AutoCloseable instead of having their own close method with the same 
signature.
---
 .../apache/flink/runtime/leaderelection/LeaderElectionDriver.java   | 5 +
 .../apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java | 6 +-
 .../runtime/leaderelection/TestingLeaderElectionEventHandler.java   | 3 ++-
 3 files changed, 4 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
index 87ab1a79d83..be16ca8a5a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
@@ -28,7 +28,7 @@ package org.apache.flink.runtime.leaderelection;
  * Important: The {@link LeaderElectionDriver} could not 
guarantee that there is
  * no {@link LeaderElectionEventHandler} callbacks happen after {@link 
#close()}.
  */
-public interface LeaderElectionDriver {
+public interface LeaderElectionDriver extends AutoCloseable {
 
 /**
  * Write the current leader information to external persistent 
storage(e.g. Zookeeper,
@@ -49,7 +49,4 @@ public interface LeaderElectionDriver {
  * @return Return whether the driver has leadership.
  */
 boolean hasLeadership();
-
-/** Close the services used for leader election. */
-void close() throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java
index 35f87fc1d81..48add240540 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java
@@ -25,8 +25,4 @@ package org.apache.flink.runtime.leaderretrieval;
  * Important: The {@link LeaderRetrievalDriver} could not 
guarantee that there
  * is no {@link LeaderRetrievalEventHandler} callbacks happen after {@link 
#close()}.
  */
-public interface LeaderRetrievalDriver {
-
-/** Close the services used for leader retrieval. */
-void close() throws Exception;
-}
+public interface LeaderRetrievalDriver extends AutoCloseable {}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
index af4e67b758e..5025e02de4a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
@@ -31,7 +31,7 @@ import java.util.function.Consumer;
  * testing purposes.
  */
 public class TestingLeaderElectionEventHandler extends TestingLeaderBase
-implements LeaderElectionEventHandler {
+implements LeaderElectionEventHandler, AutoCloseable {
 
 private final Object lock = new Object();
 
@@ -135,6 +135,7 @@ public class TestingLeaderElectionEventHandler extends 
TestingLeaderBase
 }
 }
 
+@Override
 public void close() {
 synchronized (lock) {
 running = false;



[flink] 02/04: [FLINK-32010][kubernetes] Rename KubernetesUtils#checkConfigMaps to KubernetesUtils#getOnlyConfigMap to provide more clarity.

2023-05-05 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 075f359fdbbe331c2ee93479204e7fec57fc9b8b
Author: David Moravek 
AuthorDate: Fri May 5 10:10:46 2023 +0200

[FLINK-32010][kubernetes] Rename KubernetesUtils#checkConfigMaps to 
KubernetesUtils#getOnlyConfigMap to provide more clarity.
---
 .../highavailability/KubernetesLeaderElectionDriver.java |  8 
 .../highavailability/KubernetesLeaderRetrievalDriver.java|  4 ++--
 .../KubernetesMultipleComponentLeaderElectionDriver.java |  8 
 .../org/apache/flink/kubernetes/utils/KubernetesUtils.java   | 12 
 4 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
index 92c3a8345e6..ec0dfe30e46 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
@@ -44,8 +44,8 @@ import java.util.concurrent.ExecutorService;
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
 import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
-import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps;
 import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.getLeaderInformationFromConfigMap;
+import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.getOnlyConfigMap;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -216,7 +216,7 @@ public class KubernetesLeaderElectionDriver implements 
LeaderElectionDriver {
 @Override
 public void onModified(List configMaps) {
 // We should only receive events for the watched ConfigMap
-final KubernetesConfigMap configMap = checkConfigMaps(configMaps, 
configMapName);
+final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, 
configMapName);
 
 if (KubernetesLeaderElector.hasLeadership(configMap, 
lockIdentity)) {
 leaderElectionEventHandler.onLeaderInformationChange(
@@ -226,7 +226,7 @@ public class KubernetesLeaderElectionDriver implements 
LeaderElectionDriver {
 
 @Override
 public void onDeleted(List configMaps) {
-final KubernetesConfigMap configMap = checkConfigMaps(configMaps, 
configMapName);
+final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, 
configMapName);
 // The ConfigMap is deleted externally.
 if (KubernetesLeaderElector.hasLeadership(configMap, 
lockIdentity)) {
 fatalErrorHandler.onFatalError(
@@ -237,7 +237,7 @@ public class KubernetesLeaderElectionDriver implements 
LeaderElectionDriver {
 
 @Override
 public void onError(List configMaps) {
-final KubernetesConfigMap configMap = checkConfigMaps(configMaps, 
configMapName);
+final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, 
configMapName);
 if (KubernetesLeaderElector.hasLeadership(configMap, 
lockIdentity)) {
 fatalErrorHandler.onFatalError(
 new LeaderElectionException(
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
index 33e88ba903f..f08e4622165 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
@@ -35,7 +35,7 @@ import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.function.Function;
 
-import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.checkConfigMaps;
+import static 
org.apache.flink.kubernetes.utils.KubernetesUtils.getOnlyConfigMap;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -105,7 +105,7 @@ public class KubernetesLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
 
 @Override
 public void onModified(List configMaps) {
-final KubernetesConfigMap configMap = checkConfigMaps(configMaps, 
configMapName);
+final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, 
configMapName);
 leaderRetrievalEventHandler.notifyLeaderAddress(
 

[flink] 04/04: [FLINK-32010][kubernetes] Properly handle KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in case the leader is already known.

2023-05-05 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 026d7ccfe1d6f4cfa26c9038dd05403c889d2e0d
Author: David Moravek 
AuthorDate: Fri May 5 12:32:55 2023 +0200

[FLINK-32010][kubernetes] Properly handle 
KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in 
case the leader is already known.
---
 .../KubernetesLeaderRetrievalDriver.java   |  11 ++-
 ...KubernetesLeaderElectionAndRetrievalITCase.java | 102 +
 2 files changed, 74 insertions(+), 39 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
index f08e4622165..035057c7441 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
@@ -98,9 +98,14 @@ public class KubernetesLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
 
 @Override
 public void onAdded(List configMaps) {
-// The ConfigMap is created by KubernetesLeaderElectionDriver with 
empty data. We do not
-// process this
-// useless event.
+// The ConfigMap is created by KubernetesLeaderElectionDriver with 
empty data. We don't
+// really need to process anything unless the retriever was 
started after the leader
+// election has already succeeded.
+final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, 
configMapName);
+final LeaderInformation leaderInformation = 
leaderInformationExtractor.apply(configMap);
+if (!leaderInformation.isEmpty()) {
+
leaderRetrievalEventHandler.notifyLeaderAddress(leaderInformation);
+}
 }
 
 @Override
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
index 4d922cbda06..d101d19b805 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.highavailability;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.KubernetesExtension;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
@@ -28,15 +29,18 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionEventHandler;
 import 
org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler;
-import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -54,73 +58,99 @@ class KubernetesLeaderElectionAndRetrievalITCase {
 "akka.tcp://flink@172.20.1.21:6123/user/rpc/dispatcher";
 
 @RegisterExtension
-private static final KubernetesExtension kubernetesExtension = new 
KubernetesExtension();
+private static final KubernetesExtension KUBERNETES_EXTENSION = new 
KubernetesExtension();
+
+@RegisterExtension
+private static final TestExecutorExtension 
EXECUTOR_EXTENSION =
+new TestExecutorExtension<>(Executors::newCachedThreadPool);
 
 @Test
 void testLeaderElectionAndRetrieval() throws Exception {
-final String configMapName = LEADER_CONFIGMAP_NAME + 
System.currentTimeMillis();
-KubernetesLeaderElectionDriver leaderElectionDriver = null;
-   

[flink] branch master updated: [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver

2023-05-05 Thread libenchao
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2ff4e220945 [FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for 
jdbc driver
2ff4e220945 is described below

commit 2ff4e220945680eba21065480385148ca8d034da
Author: Shammon FY 
AuthorDate: Thu Apr 6 13:09:19 2023 +0800

[FLINK-31544][jdbc-driver] Introduce DatabaseMetaData for jdbc driver

Close apache/flink#22362
---
 .../flink/table/jdbc/BaseDatabaseMetaData.java | 792 +
 .../org/apache/flink/table/jdbc/DriverUri.java |   4 +
 .../apache/flink/table/jdbc/FlinkConnection.java   |  15 +-
 .../flink/table/jdbc/FlinkDatabaseMetaData.java| 384 ++
 .../apache/flink/table/jdbc/FlinkResultSet.java|  29 +-
 .../flink/table/jdbc/FlinkResultSetMetaData.java   |  10 +-
 .../table/jdbc/utils/CloseableResultIterator.java  |  24 +
 .../table/jdbc/utils/CollectionResultIterator.java |  45 ++
 .../table/jdbc/utils/DatabaseMetaDataUtils.java| 110 +++
 .../table/jdbc/utils/StatementResultIterator.java  |  46 ++
 .../flink/table/jdbc/FlinkConnectionTest.java  |   5 +-
 .../table/jdbc/FlinkDatabaseMetaDataTest.java  | 121 
 12 files changed, 1567 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
new file mode 100644
index 000..75ec33aab67
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/BaseDatabaseMetaData.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.RowIdLifetime;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+/** Base {@link DatabaseMetaData} for flink driver with not supported 
features. */
+public abstract class BaseDatabaseMetaData implements DatabaseMetaData {
+@Override
+public String getUserName() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#getUserName is not supported");
+}
+
+@Override
+public boolean nullsAreSortedHigh() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedHigh is not supported");
+}
+
+@Override
+public boolean nullsAreSortedLow() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedLow is not supported");
+}
+
+@Override
+public boolean nullsAreSortedAtStart() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedAtStart is not 
supported");
+}
+
+@Override
+public boolean nullsAreSortedAtEnd() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#nullsAreSortedAtEnd is not supported");
+}
+
+@Override
+public boolean supportsMixedCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#supportsMixedCaseIdentifiers is not 
supported");
+}
+
+@Override
+public boolean storesUpperCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#storesUpperCaseIdentifiers is not 
supported");
+}
+
+@Override
+public boolean storesLowerCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+"FlinkDatabaseMetaData#storesLowerCaseIdentifiers is not 
supported");
+}
+
+@Override
+public boolean storesMixedCaseIdentifiers() throws SQLException {
+throw new SQLFeatureNotSupportedException(
+

[flink] branch master updated (43528be1b10 -> b814c369256)

2023-05-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 43528be1b10 [FLINK-32000][webui] Expose max parallelism in the vertex 
detail drawer.
 new 43688339721 [hotfix][test] Adds utility methods to OneShotLatch
 new 1d208f13d89 [hotfix][test] Adds helper method to 
testingLeaderElectionDriver for specifying session ID
 new ed195cfedbb [hotfix][test] Adds generic testing implementations for 
LeaderContender and LeaderElectionDriver
 new b814c369256 [FLINK-31878][runtime] Adds event processing in a separate 
thread to DefaultLeaderElectionService

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../DefaultLeaderElectionService.java  |  62 -
 .../leaderelection/LeaderElectionEventHandler.java |   6 +
 .../DefaultLeaderElectionServiceTest.java  | 280 ++---
 .../TestingGenericLeaderContender.java | 116 +
 .../TestingGenericLeaderElectionDriver.java|  94 +++
 .../TestingLeaderElectionDriver.java   |   8 +-
 .../apache/flink/core/testutils/OneShotLatch.java  |  24 ++
 7 files changed, 543 insertions(+), 47 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderElectionDriver.java



[flink] 03/04: [hotfix][test] Adds generic testing implementations for LeaderContender and LeaderElectionDriver

2023-05-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed195cfedbbcee45b9eee33b054a209b20f09b39
Author: Matthias Pohl 
AuthorDate: Thu Apr 20 14:42:15 2023 +0200

[hotfix][test] Adds generic testing implementations for LeaderContender and 
LeaderElectionDriver

Signed-off-by: Matthias Pohl 
---
 .../TestingGenericLeaderContender.java | 116 +
 .../TestingGenericLeaderElectionDriver.java|  94 +
 2 files changed, 210 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
new file mode 100644
index 000..c385cb20c2a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * {@code TestingGenericLeaderContender} is a more generic testing 
implementation of the {@link
+ * LeaderContender} interface.
+ */
+public class TestingGenericLeaderContender implements LeaderContender {
+
+private final Object lock = new Object();
+
+private final Consumer grantLeadershipConsumer;
+private final Runnable revokeLeadershipRunnable;
+private final Consumer handleErrorConsumer;
+private final Supplier getDescriptionSupplier;
+
+private TestingGenericLeaderContender(
+Consumer grantLeadershipConsumer,
+Runnable revokeLeadershipRunnable,
+Consumer handleErrorConsumer,
+Supplier getDescriptionSupplier) {
+this.grantLeadershipConsumer = grantLeadershipConsumer;
+this.revokeLeadershipRunnable = revokeLeadershipRunnable;
+this.handleErrorConsumer = handleErrorConsumer;
+this.getDescriptionSupplier = getDescriptionSupplier;
+}
+
+@Override
+public void grantLeadership(UUID leaderSessionID) {
+synchronized (lock) {
+grantLeadershipConsumer.accept(leaderSessionID);
+}
+}
+
+@Override
+public void revokeLeadership() {
+synchronized (lock) {
+revokeLeadershipRunnable.run();
+}
+}
+
+@Override
+public void handleError(Exception exception) {
+synchronized (lock) {
+handleErrorConsumer.accept(exception);
+}
+}
+
+@Override
+public String getDescription() {
+return getDescriptionSupplier.get();
+}
+
+public static Builder newBuilder() {
+return new Builder();
+}
+
+/** {@code Builder} for creating {@code TestingGenericLeaderContender} 
instances. */
+public static class Builder {
+private Consumer grantLeadershipConsumer = ignoredSessionID -> 
{};
+private Runnable revokeLeadershipRunnable = () -> {};
+private Consumer handleErrorConsumer = ignoredError -> {};
+private Supplier getDescriptionSupplier = () -> "testing 
contender";
+
+private Builder() {}
+
+public Builder setGrantLeadershipConsumer(Consumer 
grantLeadershipConsumer) {
+this.grantLeadershipConsumer = grantLeadershipConsumer;
+return this;
+}
+
+public Builder setRevokeLeadershipRunnable(Runnable 
revokeLeadershipRunnable) {
+this.revokeLeadershipRunnable = revokeLeadershipRunnable;
+return this;
+}
+
+public Builder setHandleErrorConsumer(Consumer 
handleErrorConsumer) {
+this.handleErrorConsumer = handleErrorConsumer;
+return this;
+}
+
+public Builder setGetDescriptionSupplier(Supplier 
getDescriptionSupplier) {
+this.getDescriptionSupplier = getDescriptionSupplier;
+return this;
+}
+
+public TestingGenericLeaderContender build() {
+return new 

[flink] 04/04: [FLINK-31878][runtime] Adds event processing in a separate thread to DefaultLeaderElectionService

2023-05-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b814c369256f011f18fc7283d60cd4a27593d988
Author: Matthias Pohl 
AuthorDate: Tue Apr 18 16:17:02 2023 +0200

[FLINK-31878][runtime] Adds event processing in a separate thread to 
DefaultLeaderElectionService

Signed-off-by: Matthias Pohl 
---
 .../DefaultLeaderElectionService.java  |  62 -
 .../leaderelection/LeaderElectionEventHandler.java |   6 +
 .../DefaultLeaderElectionServiceTest.java  | 280 ++---
 3 files changed, 303 insertions(+), 45 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index 19a56fcbd26..f75896a6967 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -20,7 +20,10 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +33,10 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -67,7 +74,20 @@ public class DefaultLeaderElectionService
 // this.running=true ensures that leaderContender != null
 private LeaderElectionDriver leaderElectionDriver;
 
+private final ExecutorService leadershipOperationExecutor;
+
 public DefaultLeaderElectionService(LeaderElectionDriverFactory 
leaderElectionDriverFactory) {
+this(
+leaderElectionDriverFactory,
+Executors.newSingleThreadExecutor(
+new ExecutorThreadFactory(
+
"DefaultLeaderElectionService-leadershipOperationExecutor")));
+}
+
+@VisibleForTesting
+DefaultLeaderElectionService(
+LeaderElectionDriverFactory leaderElectionDriverFactory,
+ExecutorService leadershipOperationExecutor) {
 this.leaderElectionDriverFactory = 
checkNotNull(leaderElectionDriverFactory);
 
 this.leaderContender = null;
@@ -79,6 +99,8 @@ public class DefaultLeaderElectionService
 this.confirmedLeaderInformation = LeaderInformation.empty();
 
 this.running = false;
+
+this.leadershipOperationExecutor = leadershipOperationExecutor;
 }
 
 @Override
@@ -118,6 +140,10 @@ public class DefaultLeaderElectionService
 }
 
 leaderElectionDriver.close();
+
+// graceful shutdown needs to happen outside the lock to enable any 
outstanding
+// grant/revoke events to be processed without the lock being acquired 
by the service
+ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
leadershipOperationExecutor);
 }
 
 @Override
@@ -184,6 +210,10 @@ public class DefaultLeaderElectionService
 
 @Override
 public void onGrantLeadership(UUID newLeaderSessionId) {
+runInLeaderEventThread(() -> 
onGrantLeadershipInternal(newLeaderSessionId));
+}
+
+private void onGrantLeadershipInternal(UUID newLeaderSessionId) {
 synchronized (lock) {
 if (running) {
 issuedLeaderSessionID = newLeaderSessionId;
@@ -205,6 +235,10 @@ public class DefaultLeaderElectionService
 
 @Override
 public void onRevokeLeadership() {
+runInLeaderEventThread(this::onRevokeLeadershipInternal);
+}
+
+private void onRevokeLeadershipInternal() {
 synchronized (lock) {
 if (running) {
 handleLeadershipLoss();
@@ -233,6 +267,10 @@ public class DefaultLeaderElectionService
 
 @Override
 public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
+runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
+}
+
+private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
 synchronized (lock) {
 if (running) {
 LOG.trace(
@@ -257,12 +295,28 @@ public class DefaultLeaderElectionService
 }
 } else {
 LOG.debug(
-"Ignoring change 

[flink] 01/04: [hotfix][test] Adds utility methods to OneShotLatch

2023-05-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 43688339721176c8b8070fb19ac1dcda990f9017
Author: Matthias Pohl 
AuthorDate: Tue Apr 18 14:36:22 2023 +0200

[hotfix][test] Adds utility methods to OneShotLatch

Signed-off-by: Matthias Pohl 
---
 .../apache/flink/core/testutils/OneShotLatch.java  | 24 ++
 1 file changed, 24 insertions(+)

diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 82c5afb787e..c0ed331fd05 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -66,6 +66,18 @@ public final class OneShotLatch {
 }
 }
 
+/**
+ * Calls {@link #await()} and transforms any {@link InterruptedException} 
into a {@link
+ * RuntimeException}.
+ */
+public void awaitQuietly() {
+try {
+await();
+} catch (InterruptedException e) {
+throw new RuntimeException(e);
+}
+}
+
 /**
  * Waits until {@link OneShotLatch#trigger()} is called. Once {@code 
#trigger()} has been called
  * this call will always return immediately.
@@ -109,6 +121,18 @@ public final class OneShotLatch {
 }
 }
 
+/**
+ * Calls {@link #await(long, TimeUnit)} and transforms any {@link 
InterruptedException} or
+ * {@link TimeoutException} into a {@link RuntimeException}.
+ */
+public void awaitQuietly(long timeout, TimeUnit timeUnit) {
+try {
+await(timeout, timeUnit);
+} catch (InterruptedException | TimeoutException e) {
+throw new RuntimeException(e);
+}
+}
+
 /**
  * Checks if the latch was triggered.
  *



[flink] 02/04: [hotfix][test] Adds helper method to testingLeaderElectionDriver for specifying session ID

2023-05-05 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d208f13d8954c68a10c1b380b3b0dfd00921f63
Author: Matthias Pohl 
AuthorDate: Tue Apr 18 18:26:09 2023 +0200

[hotfix][test] Adds helper method to testingLeaderElectionDriver for 
specifying session ID

Signed-off-by: Matthias Pohl 
---
 .../flink/runtime/leaderelection/TestingLeaderElectionDriver.java | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
index 5352e498641..9f344b30aa3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
@@ -69,13 +69,17 @@ public class TestingLeaderElectionDriver implements 
LeaderElectionDriver {
 return leaderInformation;
 }
 
-public void isLeader() {
+public void isLeader(UUID newSessionID) {
 synchronized (lock) {
 isLeader.set(true);
-leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+leaderElectionEventHandler.onGrantLeadership(newSessionID);
 }
 }
 
+public void isLeader() {
+isLeader(UUID.randomUUID());
+}
+
 public void notLeader() {
 synchronized (lock) {
 isLeader.set(false);



[flink-connector-cassandra] branch main updated: [FLINK-31698] Support both Flink 1.16 and Flink 1.17 to resolve failing nightly builds

2023-05-05 Thread echauchot
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
 new f8ac343  [FLINK-31698] Support both Flink 1.16 and Flink 1.17 to 
resolve failing nightly builds
f8ac343 is described below

commit f8ac3431f3f5b3f3a8d0e39885a446dc341ea764
Author: MartijnVisser 
AuthorDate: Thu Apr 13 12:08:11 2023 +0200

[FLINK-31698] Support both Flink 1.16 and Flink 1.17 to resolve failing 
nightly builds
---
 .github/workflows/push_pr.yml | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 335f108..0542242 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -16,13 +16,16 @@
 # limitations under the License.
 

 
-name: CI
+name: Build flink-connector-cassandra
 on: [push, pull_request]
 concurrency:
   group: ${{ github.workflow }}-${{ github.ref }}
   cancel-in-progress: true
 jobs:
   compile_and_test:
+strategy:
+  matrix:
+flink: [1.16.1, 1.17.0]
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
-  flink_version: 1.17.0
+  flink_version: ${{ matrix.flink }}



svn commit: r61661 - in /dev/flink/flink-connector-cassandra-3.1.0-rc2: ./ flink-connector-cassandra-3.1.0-src.tgz flink-connector-cassandra-3.1.0-src.tgz.asc flink-connector-cassandra-3.1.0-src.tgz.s

2023-05-05 Thread dannycranmer
Author: dannycranmer
Date: Fri May  5 09:27:15 2023
New Revision: 61661

Log:
Add flink-connector-cassandra-3.1.0-rc2

Added:
dev/flink/flink-connector-cassandra-3.1.0-rc2/

dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz
   (with props)

dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc

dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512

Added: 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz
==
Binary file - no diff available.

Propchange: 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz
--
svn:mime-type = application/octet-stream

Added: 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc
==
--- 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc
 (added)
+++ 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.asc
 Fri May  5 09:27:15 2023
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEED3nyr7I1G8KWeFRFkfnB7BJf2NsFAmRUy+4ACgkQkfnB7BJf
+2Nuffw//fYf/kdvsY6E7PDBK2uh1R/ufc+nzd9F8LDzszVNAdjtHyo192ryOqpVm
+hPuytYIRacIa1IJJenvN5/xKx1Db0hPvM4rxX54GYaBFdHYiUAoDGBbai8ThUp5F
+Qeql37/JN/3dzsCZjETVCVN23CdFT5jxydtUPQQyj+PIwUc6Son4QOtwdRezUI4+
+GsKpGWjRS6ORxeR2Rit+HUUyuXRF1OevXKEbBiT0+DA2370J3a8r8dRJ+Det
+G1YpqrlN8MBjBmBx143zuiCveSXibF8DeHGqDGP3YoZodozpZg/WNKGH6ZhjLM/p
+QBgivejHAsu3Q5V5nuYS6RlqCY/Gr9yGLsAkvprjpU3U/RwYzX3SN6gQedGc5owI
+zsUDNVD0hlCrR96XNKv5sIt+YWbrdoOJ5pUEHqtW7kzErNQqCB8yjYtUQgbajW74
+fNRfdP6VEpxFaP0lBr6E8QBTGtVDhZYYPmY0mCYBXV6z4fhPKwKCXPhR6rQJuCkW
+LZm/QTXch4CVCNcX6vYHuB4VYfpp0Hn/GiTOm7EKXlAMKwlk5Ktgr+ahtrcE3mp8
+ebx+YFNI+XpQpM5bRBRQefAsXth0thq1CSZkCCsGcMksZvKtvBbuikFVYxgX0cpE
+IwE1nxTOy/RjhzVqAI88OSmKUiZSGknIqTyfM3Cz1xu1WUBkeNU=
+=orXf
+-END PGP SIGNATURE-

Added: 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512
==
--- 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512
 (added)
+++ 
dev/flink/flink-connector-cassandra-3.1.0-rc2/flink-connector-cassandra-3.1.0-src.tgz.sha512
 Fri May  5 09:27:15 2023
@@ -0,0 +1 @@
+15394ca5435be1625e5bb634b2aab1f6776981aca660daa1949f3d414a105062a0a9fbcfe39f73b84ffa9d438181888c36eae04d3eb33aa243b503f6a3197b73
  flink-connector-cassandra-3.1.0-src.tgz




[flink-connector-cassandra] annotated tag v3.1.0-rc2 updated (83945fe -> fa0dd03)

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to annotated tag v3.1.0-rc2
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


*** WARNING: tag v3.1.0-rc2 was modified! ***

from 83945fe  (commit)
  to fa0dd03  (tag)
 tagging 83945fe41cb6e7c188dfbf656b04955142600bb2 (commit)
  by Danny Cranmer
  on Fri May 5 10:27:02 2023 +0100

- Log -
vv3.1.0-rc2
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEED3nyr7I1G8KWeFRFkfnB7BJf2NsFAmRUy+YACgkQkfnB7BJf
2NvODA//a84xAGHxiauAZhqus7NLyzZhqxyiRxGK+fTrtn9DgIAKSsz8aCNNMZw2
4mbwnMRF5wHlJnF31OgzLVWT4pQ4QlIwTAORfXCVFHbrUI0p9T1c3sXmy7RbiJ6L
31FeCnWyRKKuifx9zlhbnaLlG8obR0ujbGiO0DZoE8b+2l/V71PVYv9zEzu73IiL
bQRO4RQz52j8qkH3yiGGaosACcyqHools2D5gOxvEW09hS9ofVLxpKTPom0v5y0d
rWKp9o71dT0tsSipX5hq5v63Lxlb5TbOy+2xGvZiRtufqC8jMyW8NouDK7PiJYMF
kOkGwpstwbjkrqIAkmZ77TcADmaKPliDSrG3IHYSRpdkdMQZLzUVT0+OECXzshhV
ldfmT2ULMlpZAoLagi9JfkopZS3t/0Y8INjhl43TilgP5jdlAOIzKLnH7Z/z3ElL
fj5mKSr9uUlmVNk4hyVLfqloC0tBO8W03+gBc6vJIPLOhGfUFyiGOZrlVoBbGTRB
+l7IzUFUVSmRsc9XnQiF4m7SywyWJzZlOpKlAfGzyDx9oGI29bt8dbWYCHJb9MfW
tqxjNEerD+0VM4CNZpCYHRFbsdrlEgtzxePkD3SGKwLNemwKdBzrtkgsR4VD0MXD
hu/lOrp0AY1UwQtzOVF0vAT/J2/ofklPrnOgtopFecrX6vchZ9k=
=J8Qs
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[flink-connector-cassandra] 01/02: [FLINK-31927] Disable cassandra driver metrics as they are not integrated with Flink metrics

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch v3.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit 92ee0991a268e11c7db25c1aa040bbdebfc697f7
Author: Etienne Chauchot 
AuthorDate: Tue May 2 16:29:45 2023 +0200

[FLINK-31927] Disable cassandra driver metrics as they are not integrated 
with Flink metrics
---
 .../apache/flink/streaming/connectors/cassandra/ClusterBuilder.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
index fb537bc..9a98892 100644
--- 
a/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
@@ -24,12 +24,13 @@ import java.io.Serializable;
 
 /**
  * This class is used to configure a {@link com.datastax.driver.core.Cluster} 
after deployment. The
- * cluster represents the connection that will be established to Cassandra.
+ * cluster represents the connection that will be established to Cassandra. 
Cassandra driver metrics
+ * are not integrated with Flink metrics, so they are disabled.
  */
 public abstract class ClusterBuilder implements Serializable {
 
 public Cluster getCluster() {
-return buildCluster(Cluster.builder());
+return buildCluster(Cluster.builder().withoutMetrics());
 }
 
 /**



[flink-connector-cassandra] 02/02: [FLINK-31927] Use default clusterBuilder reporting configuration in the tests

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch v3.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit 3170f2a5c3b8e6b520dd4db55f8795a817266261
Author: Etienne Chauchot 
AuthorDate: Tue May 2 16:52:52 2023 +0200

[FLINK-31927] Use default clusterBuilder reporting configuration in the 
tests
---
 .../apache/flink/connector/cassandra/CassandraTestEnvironment.java   | 2 --
 .../flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java  | 5 +
 2 files changed, 1 insertion(+), 6 deletions(-)

diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index 15b98d2..a0e649c 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -173,8 +173,6 @@ public class CassandraTestEnvironment implements 
TestResource {
 // default timeout x3 and higher than
 // request_timeout_in_ms at the 
cluster level
 
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
-.withoutJMXReporting()
-.withoutMetrics()
 .build();
 }
 };
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 560ea2a..7b81e57 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -62,10 +62,7 @@ class CassandraSinkBaseTest {
 new ClusterBuilder() {
 @Override
 protected Cluster buildCluster(Cluster.Builder 
builder) {
-return builder.addContactPoint("127.0.0.1")
-.withoutJMXReporting()
-.withoutMetrics()
-.build();
+return 
builder.addContactPoint("127.0.0.1").build();
 }
 },
 CassandraSinkBaseConfig.newBuilder().build(),



[flink-connector-cassandra] branch v3.1 updated (ffe30a0 -> 3170f2a)

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to branch v3.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


from ffe30a0  Update version to 3.1-SNAPSHOT
 new 92ee099  [FLINK-31927] Disable cassandra driver metrics as they are 
not integrated with Flink metrics
 new 3170f2a  [FLINK-31927] Use default clusterBuilder reporting 
configuration in the tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/streaming/connectors/cassandra/ClusterBuilder.java  | 5 +++--
 .../apache/flink/connector/cassandra/CassandraTestEnvironment.java   | 2 --
 .../flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java  | 5 +
 3 files changed, 4 insertions(+), 8 deletions(-)



[flink] branch master updated (00b1d4cf880 -> 43528be1b10)

2023-05-05 Thread dmvk
This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 00b1d4cf880 [FLINK-31443][runtime] Maintain redundant task managers to 
speed up failover
 add 43528be1b10 [FLINK-32000][webui] Expose max parallelism in the vertex 
detail drawer.

No new revisions were added by this update.

Summary of changes:
 .../overview/detail/job-overview-drawer-detail.component.html | 11 +++
 .../overview/detail/job-overview-drawer-detail.component.ts   |  4 +++-
 2 files changed, 14 insertions(+), 1 deletion(-)



svn commit: r61660 - /release/flink/flink-connector-jdbc-3.0.0/

2023-05-05 Thread dannycranmer
Author: dannycranmer
Date: Fri May  5 09:03:00 2023
New Revision: 61660

Log:
Remove old release for Apache Flink Connector JDBC 3.0.0

Removed:
release/flink/flink-connector-jdbc-3.0.0/



[flink-web] branch asf-site updated: Rebuild website

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new a73c47e2b Rebuild website
a73c47e2b is described below

commit a73c47e2bd27266e6ed571b89e239a1afb06575a
Author: Danny Cranmer 
AuthorDate: Fri May 5 09:50:53 2023 +0100

Rebuild website
---
 content/downloads/index.html| 20 +---
 content/zh/downloads/index.html | 20 +---
 2 files changed, 26 insertions(+), 14 deletions(-)

diff --git a/content/downloads/index.html b/content/downloads/index.html
index 9aff01046..77a94f8c6 100644
--- a/content/downloads/index.html
+++ b/content/downloads/index.html
@@ -908,7 +908,7 @@ https://github.com/alex-shpak/hugo-book
 Apache Flink AWS 
Connectors 4.1.0
 Apache Flink 
Cassandra Connector 3.0.0
 Apache Flink 
Elasticsearch Connector 3.0.0
-Apache Flink JDBC 
Connector 3.0.0
+Apache Flink JDBC 
Connector 3.1.0
 Apache Flink Kafka 
Connector 3.0.0
 Apache Flink MongoDB 
Connector 1.0.1
 Apache Flink 
Opensearch Connector 1.0.0
@@ -1087,14 +1087,19 @@ under the License.
 
 1.16.x
 
-
-  Apache Flink JDBC Connector 3.0.0
-  #
+
+  Apache Flink JDBC Connector 3.1.0
+  #
 
-https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz;>Apache
 Flink JDBC Connector 3.0.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.asc;>(asc,
 https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.sha512;>sha512)
+https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;>Apache
 Flink JDBC Connector 3.1.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.asc;>(asc,
 https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.sha512;>sha512)
 This component is compatible with Apache Flink version(s):
 
-1.16.x
+
+1.16.x
+
+
+1.17.x
+
 
 
   Apache Flink Kafka Connector 3.0.0
@@ -1520,6 +1525,7 @@ The statefun-flink-harness dependency 
includes a local execution en
 Flink AWS Connectors 4.1.0 - 2023-04-03 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-aws-4.1.0/flink-connector-aws-4.1.0-src.tgz;>Source)
 Flink Kafka Connector 3.0.0 - 2023-04-21 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.0.0/flink-connector-kafka-3.0.0-src.tgz;>Source)
 Flink MongoDB Connector 1.0.1 - 2023-04-24 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-mongodb-1.0.1/flink-connector-mongodb-1.0.1-src.tgz;>Source)
+Flink JDBC Connector 3.1.0 - 2023-05-05 (https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;>Source)
 
 
   Apache Flink Stateful Functions
@@ -1642,7 +1648,7 @@ The statefun-flink-harness dependency 
includes a local execution en
 Apache Flink AWS 
Connectors 4.1.0
 Apache Flink 
Cassandra Connector 3.0.0
 Apache Flink 
Elasticsearch Connector 3.0.0
-Apache Flink JDBC 
Connector 3.0.0
+Apache Flink JDBC 
Connector 3.1.0
 Apache Flink Kafka 
Connector 3.0.0
 Apache Flink MongoDB 
Connector 1.0.1
 Apache Flink 
Opensearch Connector 1.0.0
diff --git a/content/zh/downloads/index.html b/content/zh/downloads/index.html
index f75ab3adc..905cbc2df 100644
--- a/content/zh/downloads/index.html
+++ b/content/zh/downloads/index.html
@@ -908,7 +908,7 @@ https://github.com/alex-shpak/hugo-book
 Apache Flink AWS 
Connectors 4.1.0
 Apache Flink 
Cassandra Connector 3.0.0
 Apache Flink 
Elasticsearch Connector 3.0.0
-Apache Flink JDBC 
Connector 3.0.0
+Apache Flink JDBC 
Connector 3.1.0
 Apache Flink Kafka 
Connector 3.0.0
 Apache Flink MongoDB 
Connector 1.0.1
 Apache Flink 
Opensearch Connector 1.0.0
@@ -1087,14 +1087,19 @@ under the License.
 
 1.16.x
 
-
-  Apache Flink JDBC Connector 3.0.0
-  #
+
+  Apache Flink JDBC Connector 3.1.0
+  #
 
-https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz;>Apache
 Flink JDBC Connector 3.0.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.asc;>(asc,
 https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.sha512;>sha512)
+https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;>Apache
 Flink JDBC Connector 3.1.0 Source Release https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.asc;>(asc,
 

[flink] branch master updated (8d430f51d77 -> 00b1d4cf880)

2023-05-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 8d430f51d77 [FLINK-31995][tests] Adds shutdown check to 
DirectExecutorService
 add 2cabb157a4f [hotfix] Keep all calculations and deriving in 
SlotManagerConfiguration#fromConfiguration
 add 92f8666ec1d [FLINK-31443][runtime] Only check idle timeout when 
resource allocator is supported.
 add 62d3a47a990 [FLINK-31443][runtime] let ResourceAllocationStrategy 
decide which pending/idle task managers should be release.
 add 8a2b2729b9f [FLINK-31443][runtime] Maintain 
pendingSlotAllocationRecords in PendingTaskManager
 add 00b1d4cf880 [FLINK-31443][runtime] Maintain redundant task managers to 
speed up failover

No new revisions were added by this update.

Summary of changes:
 .../generated/resource_manager_configuration.html  |   2 +-
 .../configuration/ResourceManagerOptions.java  |   9 +-
 .../ResourceManagerRuntimeServices.java|   4 +-
 .../DefaultResourceAllocationStrategy.java | 137 ++-
 .../slotmanager/FineGrainedSlotManager.java|  98 ++-
 .../slotmanager/FineGrainedTaskManagerTracker.java |  41 ++---
 .../slotmanager/PendingTaskManager.java|  41 +
 .../slotmanager/ResourceAllocationStrategy.java|  13 ++
 .../slotmanager/ResourceReleaseResult.java |  70 
 .../slotmanager/SlotManagerConfiguration.java  |  11 +-
 .../slotmanager/TaskExecutorManager.java   |  41 +++--
 .../TaskManagerResourceInfoProvider.java   |  11 --
 .../apache/flink/runtime/util/ResourceCounter.java |  11 ++
 .../slotmanager/DeclarativeSlotManagerBuilder.java |   3 +
 .../DefaultResourceAllocationStrategyTest.java | 193 -
 ...gerDefaultResourceAllocationStrategyITCase.java | 101 ++-
 .../slotmanager/FineGrainedSlotManagerTest.java| 112 ++--
 .../FineGrainedTaskManagerTrackerTest.java |  46 +++--
 .../SlotManagerConfigurationBuilder.java   |   3 +
 .../TestingResourceAllocationStrategy.java |  30 +++-
 .../slotmanager/TestingTaskManagerInfo.java|  10 +-
 .../TestingTaskManagerResourceInfoProvider.java|  27 ---
 22 files changed, 746 insertions(+), 268 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceReleaseResult.java



[flink-web] branch asf-site updated: Add JDBC v3.1.0 for Flink 1.16.x and Flink 1.17.x

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 7b3bf5301 Add JDBC v3.1.0 for Flink 1.16.x and Flink 1.17.x
7b3bf5301 is described below

commit 7b3bf530181ee46dc206062b6f63576d42255649
Author: Danny Cranmer 
AuthorDate: Thu Apr 13 15:32:26 2023 +0100

Add JDBC v3.1.0 for Flink 1.16.x and Flink 1.17.x
---
 docs/data/flink_connectors.yml | 10 +-
 docs/data/release_archive.yml  |  5 +
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/docs/data/flink_connectors.yml b/docs/data/flink_connectors.yml
index c210b3800..4c5eb7282 100644
--- a/docs/data/flink_connectors.yml
+++ b/docs/data/flink_connectors.yml
@@ -44,11 +44,11 @@ elasticsearch:
   compatibility: ["1.16.x"]
   
 jdbc:
-  name: "Apache Flink JDBC Connector 3.0.0"
-  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz;
-  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.asc;
-  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.0.0/flink-connector-jdbc-3.0.0-src.tgz.sha512;
-  compatibility: ["1.16.x"]
+  name: "Apache Flink JDBC Connector 3.1.0"
+  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz;
+  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.asc;
+  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-connector-jdbc-3.1.0/flink-connector-jdbc-3.1.0-src.tgz.sha512;
+  compatibility: ["1.16.x", "1.17.x"]
 
 kafka:
   name: "Apache Flink Kafka Connector 3.0.0"
diff --git a/docs/data/release_archive.yml b/docs/data/release_archive.yml
index beb63d9af..3998e3b71 100644
--- a/docs/data/release_archive.yml
+++ b/docs/data/release_archive.yml
@@ -448,6 +448,11 @@ release_archive:
   version: 1.0.1
   release_date: 2023-04-24
   filename: "mongodb"
+- name: "Flink JDBC Connector"
+  connector: "jdbc"
+  version: 3.1.0
+  release_date: 2023-05-05
+  filename: "jdbc"
 
   flink_shaded:
 -



[flink-connector-jdbc] annotated tag v3.1.0 updated (f6f1820 -> e56a23f)

2023-05-05 Thread dannycranmer
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to annotated tag v3.1.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


*** WARNING: tag v3.1.0 was modified! ***

from f6f1820  (commit)
  to e56a23f  (tag)
 tagging f6f1820de2d11febc59539e1b1339f7350532312 (commit)
  by Danny Cranmer
  on Fri May 5 09:21:42 2023 +0100

- Log -
vv3.1.0
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEED3nyr7I1G8KWeFRFkfnB7BJf2NsFAmRUvJgACgkQkfnB7BJf
2Nsjxw//eNpdnKwgMMI0Ia1axsfeZeI73fRV/76Xb4+89gRhl2Ty3fd7LmF+ulDx
n5AWUCVUeus4xvV5oJqvJ101iQsjhEbSv76W4mH3pFWSYwONd4divYXqg9bzhE0/
saOk6w5/u9mIEnDVM/3ShAVZkvQW7+IZY6u5ZjZFavmC6YUtNPRk9XDpkhJP5cmU
bUQRPT1YyFcMpaBL1qLI6r0G2wYlxhTc00OwjQvH62hBe/DUmJbe6la/7Ri+c/DP
68ZDT2gGUpZ46dxdR1fpsHCqMDM6KwY5GNx1r9N8svoo/9AWF5iYk51jpQKDzYfR
2qEV6Ca9ZmTGHg73VNNCKIZEyCOCvFtSbb3Evf4dPNAMq/Ng9VflJ1iT3u43nzF5
0GzlOlue+kst4746TbNBHb9o15oZfALT1vdVpTrbFMuwWEC/4NvQAPIpAFJPxFRR
S/VzyXGWGh1lL5z5zO9rGy1lpmEjnq51LkgiGpcqf9P93CP/KWktic1NXlpBjVx5
WAHuOnYjS0sVd52lKWVaZc7Dx52ozz0chx7G7O5liJeI/9/uWn3vnD8DrpPtXVCO
oXqIfCoWZv+EMVaHBgzgmhPHuUtF4Z5t6poFuVquHanx9RGr6l9vvSciBeqyworT
h8b2B69x5sj8/4GrPJu25noX8O9pB8Vu0+Tj6WCrMDcoR33K0IE=
=3Fze
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r61657 - /dev/flink/flink-connector-jdbc-3.1.0-rc1/ /release/flink/flink-connector-jdbc-3.1.0/

2023-05-05 Thread dannycranmer
Author: dannycranmer
Date: Fri May  5 08:21:15 2023
New Revision: 61657

Log:
Release flink-connector-jdbc 3.1.0

Added:
release/flink/flink-connector-jdbc-3.1.0/
  - copied from r61656, dev/flink/flink-connector-jdbc-3.1.0-rc1/
Removed:
dev/flink/flink-connector-jdbc-3.1.0-rc1/



[flink] 01/03: [FLINK-18808][streaming] Introduce helper builder class for StreamConfigChainer.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49a320688c46e9f1992b1486e1dff08be876e6ac
Author: Weijie Guo 
AuthorDate: Fri May 5 15:06:54 2023 +0800

[FLINK-18808][streaming] Introduce helper builder class for 
StreamConfigChainer.
---
 .../runtime/tasks/StreamConfigChainer.java | 242 +++--
 1 file changed, 218 insertions(+), 24 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 100340a6ac5..7411a3e2fd9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
@@ -36,13 +37,17 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.OutputTag;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -58,6 +63,10 @@ public class StreamConfigChainer {
 private StreamConfig tailConfig;
 private int chainIndex = MAIN_NODE_ID;
 
+private final List> outEdgesInOrder = new 
LinkedList<>();
+
+private boolean setTailNonChainedOutputs = true;
+
 StreamConfigChainer(
 OperatorID headOperatorID,
 StreamConfig headConfig,
@@ -77,6 +86,11 @@ public class StreamConfigChainer {
 headConfig.setChainIndex(chainIndex);
 }
 
+/**
+ * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+ * TypeSerializer)} instead.
+ */
+@Deprecated
 public  StreamConfigChainer chain(
 OperatorID operatorID,
 OneInputStreamOperator operator,
@@ -85,11 +99,21 @@ public class StreamConfigChainer {
 return chain(operatorID, operator, typeSerializer, typeSerializer, 
createKeyedStateBackend);
 }
 
+/**
+ * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+ * TypeSerializer)} instead.
+ */
+@Deprecated
 public  StreamConfigChainer chain(
 OneInputStreamOperator operator, TypeSerializer 
typeSerializer) {
 return chain(new OperatorID(), operator, typeSerializer);
 }
 
+/**
+ * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+ * TypeSerializer)} instead.
+ */
+@Deprecated
 public  StreamConfigChainer chain(
 OperatorID operatorID,
 OneInputStreamOperator operator,
@@ -97,11 +121,21 @@ public class StreamConfigChainer {
 return chain(operatorID, operator, typeSerializer, typeSerializer, 
false);
 }
 
+/**
+ * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+ * TypeSerializer)} instead.
+ */
+@Deprecated
 public  StreamConfigChainer chain(
 OneInputStreamOperatorFactory operatorFactory, 
TypeSerializer typeSerializer) {
 return chain(new OperatorID(), operatorFactory, typeSerializer);
 }
 
+/**
+ * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+ * TypeSerializer)} instead.
+ */
+@Deprecated
 public  StreamConfigChainer chain(
 OperatorID operatorID,
 OneInputStreamOperatorFactory operatorFactory,
@@ -109,6 +143,11 @@ public class StreamConfigChainer {
 return chain(operatorID, operatorFactory, typeSerializer, 
typeSerializer, false);
 }
 
+/**
+ * @deprecated Use {@link #chain(TypeSerializer)} or {@link 
#chain(TypeSerializer,
+ * TypeSerializer)} instead.
+ */
+@Deprecated
 private  StreamConfigChainer chain(
 OperatorID operatorID,
 OneInputStreamOperator operator,
@@ -123,6 +162,11 @@ public class StreamConfigChainer {
 createKeyedStateBackend);
 }
 

[flink] branch FLINK-18808-record-out created (now 9be2171a8fb)

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git


  at 9be2171a8fb [FLINK-18808][streaming] Include side outputs in 
numRecordsOut metric

This branch includes the following new commits:

 new 49a320688c4 [FLINK-18808][streaming] Introduce helper builder class 
for StreamConfigChainer.
 new d5aa8a5ea2a [FLINK-18808][streaming] Introduce 
OutputWithRecordsCountCheck.
 new 9be2171a8fb [FLINK-18808][streaming] Include side outputs in 
numRecordsOut metric

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




[flink] 02/03: [FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck.

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5aa8a5ea2a9920e6368089cd8f82f0344ace6ef
Author: Weijie Guo 
AuthorDate: Wed Apr 26 18:03:07 2023 +0800

[FLINK-18808][streaming] Introduce OutputWithRecordsCountCheck.
---
 .../streaming/runtime/io/RecordWriterOutput.java   | 30 +---
 .../streaming/runtime/tasks/ChainingOutput.java| 17 -
 .../runtime/tasks/OutputWithChainingCheck.java | 42 ++
 3 files changed, 83 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 93acf23d30b..46fc01231be 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OutputWithChainingCheck;
 import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.OutputTag;
@@ -43,7 +44,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Implementation of {@link Output} that sends data using a {@link 
RecordWriter}. */
 @Internal
-public class RecordWriterOutput implements 
WatermarkGaugeExposingOutput> {
+public class RecordWriterOutput
+implements WatermarkGaugeExposingOutput>,
+OutputWithChainingCheck> {
 
 private RecordWriter> recordWriter;
 
@@ -83,19 +86,36 @@ public class RecordWriterOutput implements 
WatermarkGaugeExposingOutput record) {
+collectAndCheckIfCountNeeded(record);
+}
+
+@Override
+public  void collect(OutputTag outputTag, StreamRecord record) {
+collectAndCheckIfCountNeeded(outputTag, record);
+}
+
+@Override
+public boolean collectAndCheckIfCountNeeded(StreamRecord record) {
 if (this.outputTag != null) {
 // we are not responsible for emitting to the main output.
-return;
+return false;
 }
 
 pushToRecordWriter(record);
+return true;
 }
 
 @Override
-public  void collect(OutputTag outputTag, StreamRecord record) {
-if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
-pushToRecordWriter(record);
+public  boolean collectAndCheckIfCountNeeded(
+OutputTag outputTag, StreamRecord record) {
+if (!OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
+// we are not responsible for emitting to the side-output 
specified by this
+// OutputTag.
+return false;
 }
+
+pushToRecordWriter(record);
+return true;
 }
 
 private  void pushToRecordWriter(StreamRecord record) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
index 0ff3785056f..5b2cf742442 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
@@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-class ChainingOutput implements 
WatermarkGaugeExposingOutput> {
+class ChainingOutput
+implements WatermarkGaugeExposingOutput>,
+OutputWithChainingCheck> {
 private static final Logger LOG = 
LoggerFactory.getLogger(ChainingOutput.class);
 
 protected final Input input;
@@ -82,6 +84,19 @@ class ChainingOutput implements 
WatermarkGaugeExposingOutput>
 }
 }
 
+@Override
+public boolean collectAndCheckIfCountNeeded(StreamRecord record) {
+collect(record);
+return false;
+}
+
+@Override
+public  boolean collectAndCheckIfCountNeeded(
+OutputTag outputTag, StreamRecord record) {
+collect(outputTag, record);
+return false;
+}
+
 protected  void pushToOperator(StreamRecord record) {
 try {
 // we know that the given outputTag matches our OutputTag so the 
record
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputWithChainingCheck.java
 

[flink] 03/03: [FLINK-18808][streaming] Include side outputs in numRecordsOut metric

2023-05-05 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch FLINK-18808-record-out
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9be2171a8fbeac72e34def779e1003bf099ab802
Author: Weijie Guo 
AuthorDate: Wed Apr 26 19:39:51 2023 +0800

[FLINK-18808][streaming] Include side outputs in numRecordsOut metric

Co-authored-by: 李明 
---
 .../streaming/runtime/io/RecordWriterOutput.java   |  18 ++-
 .../runtime/tasks/BroadcastingOutputCollector.java |  24 +++-
 .../tasks/CopyingBroadcastingOutputCollector.java  |  41 ---
 .../streaming/runtime/tasks/OperatorChain.java |  39 --
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 131 +
 .../runtime/tasks/OneInputStreamTaskTest.java  |  88 ++
 .../streaming/runtime/tasks/OperatorChainTest.java |   5 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java  | 103 
 8 files changed, 415 insertions(+), 34 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 46fc01231be..6d1ae671153 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -60,6 +62,10 @@ public class RecordWriterOutput
 
 private WatermarkStatus announcedStatus = WatermarkStatus.ACTIVE;
 
+// Uses a dummy counter here to avoid checking the existence of 
numRecordsOut on the
+// per-record path.
+private Counter numRecordsOut = new SimpleCounter();
+
 @SuppressWarnings("unchecked")
 public RecordWriterOutput(
 RecordWriter>> 
recordWriter,
@@ -86,12 +92,16 @@ public class RecordWriterOutput
 
 @Override
 public void collect(StreamRecord record) {
-collectAndCheckIfCountNeeded(record);
+if (collectAndCheckIfCountNeeded(record)) {
+numRecordsOut.inc();
+}
 }
 
 @Override
 public  void collect(OutputTag outputTag, StreamRecord record) {
-collectAndCheckIfCountNeeded(outputTag, record);
+if (collectAndCheckIfCountNeeded(outputTag, record)) {
+numRecordsOut.inc();
+}
 }
 
 @Override
@@ -168,6 +178,10 @@ public class RecordWriterOutput
 }
 }
 
+public void setNumRecordsOut(Counter numRecordsOut) {
+this.numRecordsOut = checkNotNull(numRecordsOut);
+}
+
 public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
 if (isPriorityEvent
 && event instanceof CheckpointBarrier
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
index 344215f0e51..2ddc8781eb7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/BroadcastingOutputCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -31,12 +32,15 @@ import java.util.Random;
 
 class BroadcastingOutputCollector implements 
WatermarkGaugeExposingOutput> {
 
-protected final Output>[] outputs;
+protected final OutputWithChainingCheck>[] outputs;
 private final Random random = new XORShiftRandom();
 private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+protected final Counter numRecordsOutForTask;
 
-public BroadcastingOutputCollector(Output>[] outputs) {
+public BroadcastingOutputCollector(
+OutputWithChainingCheck>[] outputs, Counter 
numRecordsOutForTask) {
 this.outputs = outputs;
+this.numRecordsOutForTask = numRecordsOutForTask;
 }
 
 @Override
@@ -73,15 +77,23 @@ class BroadcastingOutputCollector implements 
WatermarkGaugeExposingOutput record) {
-for (Output> output : outputs) {
-output.collect(record);
+boolean emitted = 

[flink] branch release-1.17 updated: [FLINK-31834][Azure] Free up disk space before caching

2023-05-05 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 91dfb22e0bc [FLINK-31834][Azure] Free up disk space before caching
91dfb22e0bc is described below

commit 91dfb22e0bc7ac10a9a9f59cd9da6d62a723dadd
Author: Robert Metzger 
AuthorDate: Tue Apr 18 16:45:46 2023 +0200

[FLINK-31834][Azure] Free up disk space before caching
---
 tools/azure-pipelines/e2e-template.yml | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/tools/azure-pipelines/e2e-template.yml 
b/tools/azure-pipelines/e2e-template.yml
index d9e90045947..c0cb74674d0 100644
--- a/tools/azure-pipelines/e2e-template.yml
+++ b/tools/azure-pipelines/e2e-template.yml
@@ -44,6 +44,10 @@ jobs:
   echo "##vso[task.setvariable variable=skip;]0"
 fi
   displayName: Check if Docs only PR
+# free up disk space before running anything caching related. Caching has 
proven to fail in the past, due to lacking disk space.
+- script: ./tools/azure-pipelines/free_disk_space.sh
+  target: host
+  displayName: Free up disk space
 # the cache task does not create directories on a cache miss, and can 
later fail when trying to tar the directory if the test haven't created it
 # this may for example happen if a given directory is only used by a 
subset of tests, which are run in a different 'group'
 - bash: |
@@ -96,8 +100,6 @@ jobs:
 source ./tools/ci/maven-utils.sh
 setup_maven
 
-echo "Free up disk space"
-./tools/azure-pipelines/free_disk_space.sh
 
 # the APT mirrors access is based on a proposal from 
https://github.com/actions/runner-images/issues/7048#issuecomment-1419426054
 echo "Configure APT mirrors"