[flink] 02/02: [FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to using ImmutableSet incorrectly

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95be1e5abfe78d7ed8d171a779f280277f51569c
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Mon Dec 5 20:58:54 2022 +0800

[FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to 
using ImmutableSet incorrectly

This closes #21420
---
 .../threadinfo/JobVertexThreadInfoTracker.java |  2 +-
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 29 +-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 57072263b73..6933655c0b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -269,7 +269,7 @@ public class JobVertexThreadInfoTracker implements JobVert
 
 ExecutionAttemptID attemptId = execution.getAttemptId();
 groupedAttemptIds.add(attemptId);
-executionAttemptsByLocation.put(tmLocation, 
ImmutableSet.copyOf(groupedAttemptIds));
+executionAttemptsByLocation.put(tmLocation, groupedAttemptIds);
 }
 }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index ec6855cafa1..f7d4ebfd288 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -20,16 +20,21 @@ package org.apache.flink.runtime.webmonitor.threadinfo;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.messages.ThreadInfoSample;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.JvmUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLogger;
@@ -49,6 +54,7 @@ import javax.annotation.Nonnull;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +71,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createScheduler;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Tests for the {@link JobVertexThreadInfoTracker}. */
@@ -74,6 +82,12 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
 private static final int REQUEST_ID = 0;
 private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = 
createExecutionJobVertex();
 private static final ExecutionVertex[] TASK_VERTICES = 
EXECUTION_JOB_VERTEX.getTaskVertices();
+private static final Set ATTEMPT_IDS =
+Arrays.stream(TASK_VERTICES)
+.map(
+executionVertex ->
+
executionVertex.getCurrentExecutionAttempt().getAttemptId())
+.collect(Collectors.toSet());
 private static final JobID JOB_ID = new JobID();
 
 private static ThreadInfoSample threadInfoSample;
@@ -338,8 +352,18 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
 private static 

[flink] 01/02: [hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea9491756f8877ec54af1b55e8e144cfeb315d3d
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Wed Nov 30 00:26:00 2022 +0800

[hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.
---
 .../runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java   | 6 --
 .../webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java   | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 45b469632b1..57072263b73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -274,7 +274,9 @@ public class JobVertexThreadInfoTracker implements JobVert
 }
 
 return executionAttemptsByLocation.entrySet().stream()
-.collect(Collectors.toMap(e -> e.getKey(), e -> 
ImmutableSet.copyOf(e.getValue(;
+.collect(
+Collectors.toMap(
+Map.Entry::getKey, e -> 
ImmutableSet.copyOf(e.getValue(;
 }
 
 @VisibleForTesting
@@ -353,7 +355,7 @@ public class JobVertexThreadInfoTracker implements JobVert
 vertexStatsCache.put(key, 
createStatsFn.apply(threadInfoStats));
 resultAvailableFuture.complete(null);
 } else {
-LOG.debug(
+LOG.error(
 "Failed to gather a thread info sample for {}",
 vertex.getName(),
 throwable);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index 845b5a798f1..ec6855cafa1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -135,7 +135,7 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
 Optional result =
 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
 // cached result is returned instead of unusedThreadInfoStats
-assertThat(threadInfoStatsDefaultSample).isEqualTo(result.get());
+assertThat(result).isPresent().hasValue(threadInfoStatsDefaultSample);
 }
 
 /** Tests that cached result is NOT reused after refresh interval. */



[flink] branch master updated (bca57b7a222 -> 95be1e5abfe)

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from bca57b7a222 [FLINK-30245][table-runtime] Fix the NPE thrown when 
filtering decimal(18, x) values after calling DecimalDataUtils.subtract method
 new ea9491756f8 [hotfix][rest] Minor clean-ups in 
JobVertexThreadInfoTracker.
 new 95be1e5abfe [FLINK-30239][rest] Fix the bug that FlameGraph cannot be 
generated due to using ImmutableSet incorrectly

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../threadinfo/JobVertexThreadInfoTracker.java |  8 +++---
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 31 --
 2 files changed, 34 insertions(+), 5 deletions(-)



[flink] 02/02: [FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to using ImmutableSet incorrectly

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f98c6aad1f5bc8ad25f7608c419a5e396b8e8ac
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Mon Dec 5 20:58:54 2022 +0800

[FLINK-30239][rest] Fix the bug that FlameGraph cannot be generated due to 
using ImmutableSet incorrectly

This closes #21420
---
 .../threadinfo/JobVertexThreadInfoTracker.java |  2 +-
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 29 +-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 57072263b73..6933655c0b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -269,7 +269,7 @@ public class JobVertexThreadInfoTracker implements JobVert
 
 ExecutionAttemptID attemptId = execution.getAttemptId();
 groupedAttemptIds.add(attemptId);
-executionAttemptsByLocation.put(tmLocation, 
ImmutableSet.copyOf(groupedAttemptIds));
+executionAttemptsByLocation.put(tmLocation, groupedAttemptIds);
 }
 }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index ec6855cafa1..f7d4ebfd288 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -20,16 +20,21 @@ package org.apache.flink.runtime.webmonitor.threadinfo;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.messages.ThreadInfoSample;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.JvmUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLogger;
@@ -49,6 +54,7 @@ import javax.annotation.Nonnull;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +71,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createScheduler;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Tests for the {@link JobVertexThreadInfoTracker}. */
@@ -74,6 +82,12 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
 private static final int REQUEST_ID = 0;
 private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = 
createExecutionJobVertex();
 private static final ExecutionVertex[] TASK_VERTICES = 
EXECUTION_JOB_VERTEX.getTaskVertices();
+private static final Set ATTEMPT_IDS =
+Arrays.stream(TASK_VERTICES)
+.map(
+executionVertex ->
+
executionVertex.getCurrentExecutionAttempt().getAttemptId())
+.collect(Collectors.toSet());
 private static final JobID JOB_ID = new JobID();
 
 private static ThreadInfoSample threadInfoSample;
@@ -338,8 +352,18 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
 private s

[flink] 01/02: [hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eef91f8be7beaabae94596bfd36807a4c955f266
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Wed Nov 30 00:26:00 2022 +0800

[hotfix][rest] Minor clean-ups in JobVertexThreadInfoTracker.
---
 .../runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java   | 6 --
 .../webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java   | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
index 45b469632b1..57072263b73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.java
@@ -274,7 +274,9 @@ public class JobVertexThreadInfoTracker implements JobVert
 }
 
 return executionAttemptsByLocation.entrySet().stream()
-.collect(Collectors.toMap(e -> e.getKey(), e -> 
ImmutableSet.copyOf(e.getValue(;
+.collect(
+Collectors.toMap(
+Map.Entry::getKey, e -> 
ImmutableSet.copyOf(e.getValue(;
 }
 
 @VisibleForTesting
@@ -353,7 +355,7 @@ public class JobVertexThreadInfoTracker implements JobVert
 vertexStatsCache.put(key, 
createStatsFn.apply(threadInfoStats));
 resultAvailableFuture.complete(null);
 } else {
-LOG.debug(
+LOG.error(
 "Failed to gather a thread info sample for {}",
 vertex.getName(),
 throwable);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
index 845b5a798f1..ec6855cafa1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
@@ -135,7 +135,7 @@ public class JobVertexThreadInfoTrackerTest extends 
TestLogger {
 Optional result =
 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
 // cached result is returned instead of unusedThreadInfoStats
-assertThat(threadInfoStatsDefaultSample).isEqualTo(result.get());
+assertThat(result).isPresent().hasValue(threadInfoStatsDefaultSample);
 }
 
 /** Tests that cached result is NOT reused after refresh interval. */



[flink] branch release-1.16 updated (da9e997752c -> 0f98c6aad1f)

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from da9e997752c [FLINK-30291][Connector/DynamoDB] Update docs to render 
DynamoDB connector docs
 new eef91f8be7b [hotfix][rest] Minor clean-ups in 
JobVertexThreadInfoTracker.
 new 0f98c6aad1f [FLINK-30239][rest] Fix the bug that FlameGraph cannot be 
generated due to using ImmutableSet incorrectly

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../threadinfo/JobVertexThreadInfoTracker.java |  8 +++---
 .../threadinfo/JobVertexThreadInfoTrackerTest.java | 31 --
 2 files changed, 34 insertions(+), 5 deletions(-)



[flink-kubernetes-operator] branch main updated: [FLINK-30307] Turn off e2e test error check temporarily

2022-12-06 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new b95e3e41 [FLINK-30307] Turn off e2e test error check temporarily
b95e3e41 is described below

commit b95e3e41fbac940fa281b69d05df32ef02591691
Author: Gabor Somogyi 
AuthorDate: Tue Dec 6 09:59:35 2022 +0100

[FLINK-30307] Turn off e2e test error check temporarily
---
 e2e-tests/utils.sh | 4 
 1 file changed, 4 insertions(+)

diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index 8db0059e..81021bc3 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -132,6 +132,10 @@ function retry_times() {
 
 function check_operator_log_for_errors {
   echo "Checking for operator log errors..."
+  #https://issues.apache.org/jira/browse/FLINK-30310
+  echo "Error checking is temporarily turned off."
+  return 0
+
   operator_pod_namespace=$(get_operator_pod_namespace)
   operator_pod_name=$(get_operator_pod_name)
   echo "Operator namespace: ${operator_pod_namespace} pod: 
${operator_pod_name}"



[flink-kubernetes-operator] branch main updated: [FLINK-30151] Remove AuditUtils from error log check whitelist

2022-12-06 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new e0479732 [FLINK-30151] Remove AuditUtils from error log check whitelist
e0479732 is described below

commit e04797321f398e12f16cd0c6a7059b5037863bdf
Author: Gabor Somogyi 
AuthorDate: Tue Dec 6 10:01:41 2022 +0100

[FLINK-30151] Remove AuditUtils from error log check whitelist
---
 e2e-tests/utils.sh | 1 -
 1 file changed, 1 deletion(-)

diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index 81021bc3..32cdd593 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -145,7 +145,6 @@ function check_operator_log_for_errors {
   | grep -v "Failed to submit job to session cluster" 
`#https://issues.apache.org/jira/browse/FLINK-30148` \
   | grep -v "Error during event processing" 
`#https://issues.apache.org/jira/browse/FLINK-30149` \
   | grep -v "REST service in session cluster is bad now" 
`#https://issues.apache.org/jira/browse/FLINK-30150` \
-  | grep -v "AuditUtils" 
`#https://issues.apache.org/jira/browse/FLINK-30151` \
   | grep -v "Error while patching status" 
`#https://issues.apache.org/jira/browse/FLINK-30283` \
   | grep -e "\[\s*ERROR\s*\]" || true)
   if [ -z "${errors}" ]; then



[flink] branch master updated (95be1e5abfe -> 576c312d373)

2022-12-06 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 95be1e5abfe [FLINK-30239][rest] Fix the bug that FlameGraph cannot be 
generated due to using ImmutableSet incorrectly
 add 576c312d373 [FLINK-29420][build] Upgrade Zookeeper to 3.7.1

No new revisions were added by this update.

Summary of changes:
 .../testframe/container/FlinkTestcontainersConfigurator.java  | 2 +-
 pom.xml   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)



[flink-web] branch asf-site updated: Add Cassandra connector 3.0.0

2022-12-06 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new c032a25c3 Add Cassandra connector 3.0.0
c032a25c3 is described below

commit c032a25c3d2af1c38db58c3a7947eaef642a0fc2
Author: Chesnay Schepler 
AuthorDate: Fri Nov 25 10:29:56 2022 +0100

Add Cassandra connector 3.0.0
---
 _config.yml | 13 +
 1 file changed, 13 insertions(+)

diff --git a/_config.yml b/_config.yml
index 28182c00b..2fe52574f 100644
--- a/_config.yml
+++ b/_config.yml
@@ -125,6 +125,15 @@ connector_releases:
 url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-aws-3.0.0/flink-connector-aws-3.0.0-src.tgz";
 asc_url: 
"https://downloads.apache.org/flink/flink-connector-aws-3.0.0/flink-connector-aws-3.0.0-src.tgz.asc";
 sha512_url: 
"https://downloads.apache.org/flink/flink-connector-aws-3.0.0/flink-connector-aws-3.0.0-src.tgz.sha512";
+  - version: "3.0.0"
+source_release:
+  name: "Apache Flink Cassandra Connector 3.0.0"
+  id: "300-cassandra-connector-download-source"
+  flink_versions:
+- "1.16"
+  url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz";
+  asc_url: 
"https://downloads.apache.org/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz.asc";
+  sha512_url: 
"https://downloads.apache.org/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz.sha512";
 
 
 flink_statefun_releases:
@@ -646,6 +655,10 @@ release_archive:
 connector: "aws"
 version: 3.0.0
 release_date: 2022-11-28
+  - name: "Flink Cassandra Connector"
+connector: "cassandra"
+version: 3.0.0
+release_date: 2022-11-30
 
 flink_shaded:
   -



[flink-web] branch asf-site updated: Rebuild website

2022-12-06 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 45ec46e09 Rebuild website
45ec46e09 is described below

commit 45ec46e09ff55499687d75778a3673a046fd728d
Author: Chesnay Schepler 
AuthorDate: Tue Dec 6 11:05:55 2022 +0100

Rebuild website
---
 content/downloads.html| 19 +++
 content/zh/downloads.html | 19 +++
 2 files changed, 38 insertions(+)

diff --git a/content/downloads.html b/content/downloads.html
index 9131cd208..374ad6894 100644
--- a/content/downloads.html
+++ b/content/downloads.html
@@ -245,6 +245,7 @@
   Flink 
connectors
   Apache Flink 
Elasticsearch Connector 3.0.0
   Apache Flink AWS Connectors 
3.0.0
+  Apache Flink Cassandra 
Connector 3.0.0
 
   
   Apache Flink Stateful 
Functions 3.2.0
@@ -349,6 +350,19 @@
   
 
 
+Apache Flink Cassandra Connector 
3.0.0
+
+
+https://www.apache.org/dyn/closer.lua/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz";
 id="300-cassandra-connector-download-source">Apache Flink Cassandra Connector 
3.0.0 Source Release
+(https://downloads.apache.org/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz.asc";>asc,
 https://downloads.apache.org/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz.sha512";>sha512)
+
+
+This connector is compatible with these Apache Flink versions:
+
+
+  1.16.x
+
+
 
 
 Apache Flink® Stateful Functions 3.2.0 is the latest stable release for the 
https://flink.apache.org/stateful-functions.html";>Stateful 
Functions component.
@@ -1524,6 +1538,11 @@ Flink AWS Connectors 3.0.0 - 2022-11-28
 (https://archive.apache.org/dist/flink/flink-connector-$-3.0.0/flink-connector-$-3.0.0-src.tgz";>Source)
 
 
+
+Flink Cassandra Connector 3.0.0 - 2022-11-30
+(https://archive.apache.org/dist/flink/flink-connector-$-3.0.0/flink-connector-$-3.0.0-src.tgz";>Source)
+
+
 
 
 Flink-StateFun
diff --git a/content/zh/downloads.html b/content/zh/downloads.html
index 885f29221..c0638f50d 100644
--- a/content/zh/downloads.html
+++ b/content/zh/downloads.html
@@ -243,6 +243,7 @@
   Flink 
connectors
   Apache Flink 
Elasticsearch Connector 3.0.0
   Apache Flink AWS Connectors 
3.0.0
+  Apache Flink Cassandra 
Connector 3.0.0
 
   
   Apache Flink Stateful 
Functions 3.2.0
@@ -346,6 +347,19 @@ 
flink-docs-release-1.15/release-notes/flink-1.15.html">Flink 1.15 的发布说
   
 
 
+Apache Flink Cassandra Connector 
3.0.0
+
+
+https://www.apache.org/dyn/closer.lua/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz";
 id="300-cassandra-connector-download-source">Apache Flink Cassandra Connector 
3.0.0 Source Release
+(https://downloads.apache.org/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz.asc";>asc,
 https://downloads.apache.org/flink/flink-connector-cassandra-3.0.0/flink-connector-cassandra-3.0.0-src.tgz.sha512";>sha512)
+
+
+This connector is compatible with these Apache Flink versions:
+
+
+  1.16.x
+
+
 
 
 Apache Flink® Stateful Functions 3.2.0 是 https://flink.apache.org/stateful-functions.html";>Stateful Functions 
组件的最新稳定版本.
@@ -1475,6 +1489,11 @@ Flink AWS Connectors 3.0.0 - 2022-11-28
 (https://archive.apache.org/dist/flink/flink-connector-$-3.0.0/flink-connector-$-3.0.0-src.tgz";>Source)
 
 
+
+Flink Cassandra Connector 3.0.0 - 2022-11-30
+(https://archive.apache.org/dist/flink/flink-connector-$-3.0.0/flink-connector-$-3.0.0-src.tgz";>Source)
+
+
 
 
 Flink-shaded



[flink] branch master updated (576c312d373 -> c6e824c955b)

2022-12-06 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 576c312d373 [FLINK-29420][build] Upgrade Zookeeper to 3.7.1
 add c6e824c955b [hotfix] Remove out-dated comment

No new revisions were added by this update.

Summary of changes:
 docs/setup_docs.sh | 2 --
 1 file changed, 2 deletions(-)



[flink] branch master updated (c6e824c955b -> 209df810f13)

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from c6e824c955b [hotfix] Remove out-dated comment
 new a0aa4813b9f [FLINK-29870][runtime] Split ResourceActions to 
ResourceAllocator and ResourceEventListener.
 new 209df810f13 [FLINK-29870][runtime] move ResourceAllocator to 
ActiveResourceManager.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/resourcemanager/ResourceManager.java   | 81 +++
 .../resourcemanager/StandaloneResourceManager.java | 23 +++---
 .../active/ActiveResourceManager.java  | 70 +
 .../slotmanager/DeclarativeSlotManager.java| 25 +++---
 .../slotmanager/FineGrainedSlotManager.java| 53 -
 .../NonSupportedResourceAllocatorImpl.java}| 29 +++
 ...ResourceActions.java => ResourceAllocator.java} | 26 +++
 .../slotmanager/ResourceEventListener.java}| 21 +++--
 .../resourcemanager/slotmanager/SlotManager.java   | 10 ++-
 .../slotmanager/TaskExecutorManager.java   | 37 +
 .../resourcemanager/ResourceManagerTest.java   | 31 +++-
 .../resourcemanager/TestingResourceManager.java| 27 ---
 .../TestingResourceManagerFactory.java | 20 +++--
 .../active/ActiveResourceManagerTest.java  | 71 -
 .../AbstractFineGrainedSlotManagerITCase.java  | 12 +--
 .../slotmanager/DeclarativeSlotManagerBuilder.java | 40 --
 .../slotmanager/DeclarativeSlotManagerTest.java| 91 +++---
 ...gerDefaultResourceAllocationStrategyITCase.java |  7 +-
 .../slotmanager/FineGrainedSlotManagerTest.java| 12 +--
 .../FineGrainedSlotManagerTestBase.java| 10 ++-
 .../slotmanager/TaskExecutorManagerBuilder.java|  8 +-
 .../slotmanager/TaskExecutorManagerTest.java   | 52 ++---
 .../slotmanager/TestingResourceActionsBuilder.java | 72 -
 ...eActions.java => TestingResourceAllocator.java} | 40 --
 .../TestingResourceAllocatorBuilder.java   | 47 +++
 .../slotmanager/TestingResourceEventListener.java  | 48 
 .../TestingResourceEventListenerBuilder.java}  | 30 ---
 .../slotmanager/TestingSlotManager.java|  3 +-
 28 files changed, 526 insertions(+), 470 deletions(-)
 copy 
flink-runtime/src/{test/java/org/apache/flink/runtime/io/disk/NoOpFileChannelManager.java
 => 
main/java/org/apache/flink/runtime/resourcemanager/slotmanager/NonSupportedResourceAllocatorImpl.java}
 (54%)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/{ResourceActions.java
 => ResourceAllocator.java} (69%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/slotpool/DeclarativeSlotPoolFactory.java
 => resourcemanager/slotmanager/ResourceEventListener.java} (63%)
 delete mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/{TestingResourceActions.java
 => TestingResourceAllocator.java} (54%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceAllocatorBuilder.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceEventListener.java
 copy 
flink-runtime/src/test/java/org/apache/flink/runtime/{jobmaster/slotpool/TestingDeclarativeSlotPoolFactory.java
 => resourcemanager/slotmanager/TestingResourceEventListenerBuilder.java} (52%)



[flink] 02/02: [FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 209df810f13e2fcbe5a4ca8bb015a8a5f662adc4
Author: Weihua Hu 
AuthorDate: Thu Nov 3 21:13:36 2022 +0800

[FLINK-29870][runtime] move ResourceAllocator to ActiveResourceManager.

This closes #21233
---
 .../runtime/resourcemanager/ResourceManager.java   | 75 --
 .../resourcemanager/StandaloneResourceManager.java | 23 +++
 .../active/ActiveResourceManager.java  | 70 +++-
 .../slotmanager/FineGrainedSlotManager.java| 25 +---
 .../NonSupportedResourceAllocatorImpl.java}| 34 +-
 .../slotmanager/ResourceAllocator.java | 14 +++-
 .../slotmanager/TaskExecutorManager.java   | 27 
 .../resourcemanager/ResourceManagerTest.java   | 31 +++--
 .../resourcemanager/TestingResourceManager.java| 27 
 .../TestingResourceManagerFactory.java | 20 +++---
 .../active/ActiveResourceManagerTest.java  | 71 ++--
 .../slotmanager/DeclarativeSlotManagerTest.java| 18 ++
 ...gerDefaultResourceAllocationStrategyITCase.java |  7 +-
 .../slotmanager/TaskExecutorManagerTest.java   | 14 +---
 .../slotmanager/TestingResourceAllocator.java  | 23 +--
 .../TestingResourceAllocatorBuilder.java   | 17 +
 16 files changed, 246 insertions(+), 250 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3abd6b6b68d..5d38143974d 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -169,6 +169,8 @@ public abstract class ResourceManager
 
 private final AtomicReference latestTokens = new 
AtomicReference<>();
 
+private final ResourceAllocator resourceAllocator;
+
 public ResourceManager(
 RpcService rpcService,
 UUID leaderSessionId,
@@ -234,6 +236,8 @@ public abstract class ResourceManager
 this.startedFuture = new CompletableFuture<>();
 
 this.delegationTokenManager = delegationTokenManager;
+
+this.resourceAllocator = getResourceAllocator();
 }
 
 // 
@@ -267,7 +271,7 @@ public abstract class ResourceManager
 slotManager.start(
 getFencingToken(),
 getMainThreadExecutor(),
-new ResourceAllocatorImpl(),
+resourceAllocator,
 new ResourceEventListenerImpl(),
 blocklistHandler::isBlockedTaskManager);
 
@@ -533,7 +537,8 @@ public abstract class ResourceManager
 
 @Override
 public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
-closeTaskManagerConnection(resourceId, 
cause).ifPresent(ResourceManager.this::stopWorker);
+closeTaskManagerConnection(resourceId, cause)
+.ifPresent(ResourceManager.this::stopWorkerIfSupported);
 }
 
 @Override
@@ -984,10 +989,11 @@ public abstract class ResourceManager
 
taskExecutorResourceId.getStringWithMetadata(;
 }
 
-final WorkerType newWorker = workerStarted(taskExecutorResourceId);
+final Optional newWorkerOptional =
+getWorkerNodeIfAcceptRegistration(taskExecutorResourceId);
 
 String taskExecutorAddress = 
taskExecutorRegistration.getTaskExecutorAddress();
-if (newWorker == null) {
+if (!newWorkerOptional.isPresent()) {
 log.warn(
 "Discard registration from TaskExecutor {} at ({}) because 
the framework did "
 + "not recognize it",
@@ -996,6 +1002,7 @@ public abstract class ResourceManager
 return new TaskExecutorRegistrationRejection(
 "The ResourceManager does not recognize this 
TaskExecutor.");
 } else {
+WorkerType newWorker = newWorkerOptional.get();
 WorkerRegistration registration =
 new WorkerRegistration<>(
 taskExecutorGateway,
@@ -1159,9 +1166,8 @@ public abstract class ResourceManager
 }
 }
 
-protected void releaseResource(InstanceID instanceId, Exception cause) {
+protected WorkerType getWorkerByInstanceId(InstanceID instanceId) {
 WorkerType worker = null;
-
 // TODO: Improve performance by having an index on the instanceId
 for (Map.Entry> entry :
 taskExecutors.entrySet()) {
@@ -1171,18 +1177,7 @@ public abstract class ResourceManager

[flink] 01/02: [FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and ResourceEventListener.

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0aa4813b9fe3db37cabe38fa47f1d1096c879dc
Author: Weihua Hu 
AuthorDate: Thu Nov 3 16:10:49 2022 +0800

[FLINK-29870][runtime] Split ResourceActions to ResourceAllocator and 
ResourceEventListener.
---
 .../runtime/resourcemanager/ResourceManager.java   | 12 ++--
 .../slotmanager/DeclarativeSlotManager.java| 25 +++
 .../slotmanager/FineGrainedSlotManager.java| 32 +
 ...ResourceActions.java => ResourceAllocator.java} | 16 +
 ...urceActions.java => ResourceEventListener.java} | 30 ++--
 .../resourcemanager/slotmanager/SlotManager.java   | 10 +--
 .../slotmanager/TaskExecutorManager.java   | 14 ++--
 .../AbstractFineGrainedSlotManagerITCase.java  | 12 ++--
 .../slotmanager/DeclarativeSlotManagerBuilder.java | 40 +--
 .../slotmanager/DeclarativeSlotManagerTest.java| 83 --
 ...gerDefaultResourceAllocationStrategyITCase.java |  2 +-
 .../slotmanager/FineGrainedSlotManagerTest.java| 12 ++--
 .../FineGrainedSlotManagerTestBase.java| 10 ++-
 .../slotmanager/TaskExecutorManagerBuilder.java|  8 +--
 .../slotmanager/TaskExecutorManagerTest.java   | 38 +-
 ...eActions.java => TestingResourceAllocator.java} | 25 ++-
 ...r.java => TestingResourceAllocatorBuilder.java} | 28 ++--
 .../slotmanager/TestingResourceEventListener.java  | 48 +
 .../TestingResourceEventListenerBuilder.java   | 41 +++
 .../slotmanager/TestingSlotManager.java|  3 +-
 20 files changed, 284 insertions(+), 205 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12684c25247..3abd6b6b68d 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -56,7 +56,8 @@ import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
 import 
org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.LogInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
@@ -266,7 +267,8 @@ public abstract class ResourceManager
 slotManager.start(
 getFencingToken(),
 getMainThreadExecutor(),
-new ResourceActionsImpl(),
+new ResourceAllocatorImpl(),
+new ResourceEventListenerImpl(),
 blocklistHandler::isBlockedTaskManager);
 
 delegationTokenManager.start(this);
@@ -1334,7 +1336,7 @@ public abstract class ResourceManager
 }
 }
 
-private class ResourceActionsImpl implements ResourceActions {
+private class ResourceAllocatorImpl implements ResourceAllocator {
 
 @Override
 public void releaseResource(InstanceID instanceId, Exception cause) {
@@ -1348,9 +1350,11 @@ public abstract class ResourceManager
 validateRunsInMainThread();
 return startNewWorker(workerResourceSpec);
 }
+}
 
+private class ResourceEventListenerImpl implements ResourceEventListener {
 @Override
-public void notifyNotEnoughResourcesAvailable(
+public void notEnoughResourceAvailable(
 JobID jobId, Collection 
acquiredResources) {
 validateRunsInMainThread();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
index 71898ae747d..f5e210de073 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java
@@ -69,7 +69,7 @@ public class DeclarativeSlotManager implements SlotManager {
 
 private final SlotTracker slotTracker;
 private final ResourceTracker resourceTracker;
-private final BiFunction
+private final BiFunction
 taskExecutorManagerFactory;
 

[flink-kubernetes-operator] branch main updated: [FLINK-30268] HA metadata and other related errors should not throw DeploymentFailedException

2022-12-06 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 310ff307 [FLINK-30268] HA metadata and other related errors should not 
throw DeploymentFailedException
310ff307 is described below

commit 310ff3072cd6196202ac37a171a896d3359cfc56
Author: pvary 
AuthorDate: Tue Dec 6 14:41:16 2022 +0100

[FLINK-30268] HA metadata and other related errors should not throw 
DeploymentFailedException
---
 .../controller/FlinkDeploymentController.java  | 15 ++
 .../exception/RecoveryFailureException.java| 32 +
 .../deployment/ApplicationReconciler.java  |  4 +-
 .../operator/service/AbstractFlinkService.java |  6 +--
 .../kubernetes/operator/TestingFlinkService.java   |  4 +-
 .../controller/DeploymentRecoveryTest.java | 10 ++--
 .../controller/FlinkDeploymentControllerTest.java  | 55 ++
 .../deployment/ApplicationReconcilerTest.java  |  6 +--
 .../ApplicationReconcilerUpgradeModeTest.java  |  6 +--
 .../operator/service/NativeFlinkServiceTest.java   |  4 +-
 10 files changed, 124 insertions(+), 18 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index b681a4b5..7b9db5a3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import 
org.apache.flink.kubernetes.operator.observer.deployment.FlinkDeploymentObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
@@ -121,6 +122,8 @@ public class FlinkDeploymentController
 }
 statusRecorder.patchAndCacheStatus(flinkApp);
 reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, 
context);
+} catch (RecoveryFailureException rfe) {
+handleRecoveryFailed(flinkApp, rfe);
 } catch (DeploymentFailedException dfe) {
 handleDeploymentFailed(flinkApp, dfe);
 } catch (Exception e) {
@@ -153,6 +156,18 @@ public class FlinkDeploymentController
 EventRecorder.Component.JobManagerDeployment);
 }
 
+private void handleRecoveryFailed(FlinkDeployment flinkApp, 
RecoveryFailureException rfe) {
+LOG.error("Flink recovery failed", rfe);
+ReconciliationUtils.updateForReconciliationError(
+flinkApp, rfe, configManager.getOperatorConfiguration());
+eventRecorder.triggerEvent(
+flinkApp,
+EventRecorder.Type.Warning,
+rfe.getReason(),
+rfe.getMessage(),
+EventRecorder.Component.JobManagerDeployment);
+}
+
 @Override
 public Map prepareEventSources(
 EventSourceContext context) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/RecoveryFailureException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/RecoveryFailureException.java
new file mode 100644
index ..1b0be67b
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/RecoveryFailureException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the Licen

[flink-kubernetes-operator] branch main updated: [FLINK-30154] Allow to set flinkConfiguration in SessionJob

2022-12-06 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 2c207edf [FLINK-30154] Allow to set flinkConfiguration in SessionJob
2c207edf is described below

commit 2c207edf5915f077b92c5b2bc2f0e6cac54daa9d
Author: Gabor Somogyi 
AuthorDate: Tue Dec 6 14:44:40 2022 +0100

[FLINK-30154] Allow to set flinkConfiguration in SessionJob
---
 .../kubernetes/operator/validation/DefaultValidator.java | 12 +---
 .../kubernetes/operator/validation/DefaultValidatorTest.java |  7 +++
 2 files changed, 4 insertions(+), 15 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 0df0e593..05fdfaa8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -61,9 +61,6 @@ public class DefaultValidator implements 
FlinkResourceValidator {
 KubernetesConfigOptions.NAMESPACE.key(), 
KubernetesConfigOptions.CLUSTER_ID.key()
 };
 
-private static final Set ALLOWED_FLINK_SESSION_JOB_CONF_KEYS =
-
Set.of(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key());
-
 private static final Set ALLOWED_LOG_CONF_KEYS =
 Set.of(Constants.CONFIG_FILE_LOG4J_NAME, 
Constants.CONFIG_FILE_LOGBACK_NAME);
 
@@ -468,14 +465,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
 return Optional.empty();
 }
 
-for (String key : flinkSessionJobConfig.keySet()) {
-if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) {
-return Optional.of(
-String.format(
-"Invalid session job flinkConfiguration key: 
%s. Allowed keys are %s",
-key, ALLOWED_FLINK_SESSION_JOB_CONF_KEYS));
-}
-}
+// Exclude specific keys if they cause issues
 return Optional.empty();
 }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index fe2e3162..a8cb5fa5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -514,12 +514,11 @@ public class DefaultValidatorTest {
 .setFlinkConfiguration(
 Map.of(
 KubernetesOperatorConfigOptions
-
.OPERATOR_RECONCILE_INTERVAL
+
.PERIODIC_SAVEPOINT_INTERVAL
 .key(),
-"60")),
+"1m")),
 flinkDeployment -> {},
-"Invalid session job flinkConfiguration key: 
kubernetes.operator.reconcile.interval."
-+ " Allowed keys are 
[kubernetes.operator.user.artifacts.http.header]");
+null);
 
 testSessionJobValidateWithModifier(
 sessionJob -> {



[flink-connector-pulsar] 13/16: [FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase. (#21252)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit a6ba99792ea4916f73cca5bee4389dcbec2bf180
Author: Yufan Sheng 
AuthorDate: Fri Nov 11 15:14:22 2022 +0800

[FLINK-29830][Connector/Pulsar] Create the topic with schema before 
consuming messages in PulsarSinkITCase. (#21252)
---
 .../connector/pulsar/sink/PulsarSinkITCase.java|  2 ++
 .../pulsar/testutils/function/ControlSource.java   | 41 ++
 .../testutils/runtime/PulsarRuntimeOperator.java   |  4 +++
 3 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 71e778c..0e5a173 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
+import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for using PulsarSink writing to a Pulsar cluster. */
@@ -104,6 +105,7 @@ class PulsarSinkITCase {
 // A random topic with partition 4.
 String topic = randomAlphabetic(8);
 operator().createTopic(topic, 4);
+operator().createSchema(topic, STRING);
 int counts = ThreadLocalRandom.current().nextInt(100, 200);
 
 ControlSource source =
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
index 3684167..127e267 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/function/ControlSource.java
@@ -31,8 +31,12 @@ import org.apache.flink.testutils.junit.SharedReference;
 
 import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +49,12 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 
 /**
  * This source is used for testing in Pulsar sink. We would generate a fix 
number of records by the
@@ -183,20 +191,30 @@ public class ControlSource extends AbstractRichFunction
 private static class StopSignal implements Closeable {
 private static final Logger LOG = 
LoggerFactory.getLogger(StopSignal.class);
 
-private final String topic;
 private final int desiredCounts;
 // This is a thread-safe list.
 private final List consumedRecords;
 private final AtomicLong deadline;
 private final ExecutorService executor;
+private final Consumer consumer;
+private final AtomicReference 
throwableException;
 
 public StopSignal(
 PulsarRuntimeOperator operator, String topic, int 
messageCounts, Duration timeout) {
-this.topic = topic;
 this.desiredCounts = messageCounts;
 this.consumedRecords = Collections.synchronizedList(new 
ArrayList<>(messageCounts));
 this.deadline = new AtomicLong(timeout.toMillis() + 
System.currentTimeMillis());
 this.executor = Executors.newSingleThreadExecutor();
+ConsumerBuilder consumerBuilder =
+operator.client()
+.newConsumer(Schema.STRING)
+.topic(topic)
+.subscriptionName(randomAlphanumeric(10))
+.subscription

[flink-connector-pulsar] branch main updated (40c2705 -> 3199922)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


from 40c2705  [hotfix] Enable checkstyle only for source- and testsource 
directories, not for generated sources
 new d21147e  [FLINK-28853][pulsar] Implement pausing Pulsar splits in case 
of watermark drift
 new 9cce627  [FLINK-28853][connector-base] Add 
allow-unaligned-source-splits config option
 new b0a1678  [FLINK-28853] Address PR comments / Add Kafka and Pulsar 
split pausing tests
 new 08e7b19  [hotfix][docs]: Fix minor grammar and spelling mistakes
 new dfb5fd8  [FLINK-29580][Connector/Pulsar] Remove 
pulsar.consumer.autoUpdatePartitionsIntervalSeconds option. (#21070)
 new 618daa2  [FLINK-29433][Connector/Pulsar] Support Auth through the 
builder pattern in Pulsar connector. (#21071)
 new 62da02a  [FLINK-28085][Connector/Pulsar] Close the pending Pulsar 
transactions before closing pipeline (#21075)
 new c6a20a6  [FLINK-28820][Connector/Pulsar] Improve the writing 
performance for PulsarSink (#21074)
 new 1c88632  [FLINK-29624][Common][Connector][Filesystem] Upgrade 
org.apache.commons:commons-lang3 from 3.3.2 to 3.12.0
 new 4f204e8  [FLINK-29709][Connector/Pulsar] Bump the Pulsar to latest 
2.10.2 (#21204)
 new d6e2e06  [FLINK-28083][Connector/Pulsar] PulsarSource work with 
object-reusing DeserializationSchema. (#21205)
 new f6ead4c  [FLINK-29860][Connectors/Pulsar] InlineElement fields should 
be transient  (#21262)
 new a6ba997  [FLINK-29830][Connector/Pulsar] Create the topic with schema 
before consuming messages in PulsarSinkITCase. (#21252)
 new d8abb2a  [FLINK-26027][Connector/Pulsar] Expose Pulsar producer 
metrics and add FLIP-33 sink metrics. (#21249)
 new d330e40  [FLINK-30254][Connector/Pulsar] Sync Pulsar updates and set 
Flink version to 1.17-SNAPSHOT
 new 3199922  [FLINK-30254][Connector/Pulsar] Resolve dependency 
convergence errors

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/push_pr.yml  |   6 +-
 .github/workflows/weekly.yml   |   2 +-
 flink-connector-pulsar/pom.xml |  13 +--
 .../pulsar/common/metrics/MetricNames.java |  63 
 .../common/metrics/ProducerMetricsInterceptor.java |  67 
 .../pulsar/common/schema/PulsarSchema.java |  15 ++-
 .../pulsar/common/schema/PulsarSchemaUtils.java|   9 +-
 .../common/utils/PulsarTransactionUtils.java   |   2 +
 .../connector/pulsar/sink/PulsarSinkBuilder.java   |  33 ++
 .../connector/pulsar/sink/PulsarSinkOptions.java   |  16 +++
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  |   4 -
 .../pulsar/sink/config/SinkConfiguration.java  |  29 +++---
 .../connector/pulsar/sink/writer/PulsarWriter.java |  86 +++-
 .../pulsar/sink/writer/router/MessageKeyHash.java  |   2 +-
 .../sink/writer/router/TopicRoutingMode.java   |   2 +-
 .../sink/writer/topic/TopicProducerRegister.java   |  86 +++-
 .../pulsar/source/PulsarSourceBuilder.java |  33 ++
 .../pulsar/source/PulsarSourceOptions.java |  21 ++--
 .../pulsar/source/config/CursorVerification.java   |   2 +-
 .../source/config/PulsarSourceConfigUtils.java |   5 -
 .../pulsar/source/config/SourceConfiguration.java  |  16 ++-
 .../source/enumerator/PulsarSourceEnumerator.java  |   8 ++
 .../source/enumerator/assigner/SplitAssigner.java  |   3 +
 .../enumerator/assigner/SplitAssignerBase.java |   5 +
 .../source/enumerator/cursor/StartCursor.java  |   4 +-
 .../source/enumerator/cursor/StopCursor.java   |   6 +-
 .../source/reader/PulsarSourceReaderFactory.java   |  21 ++--
 .../source/reader/emitter/PulsarRecordEmitter.java |  63 ++--
 .../reader/fetcher/PulsarFetcherManagerBase.java   |  41 +---
 .../fetcher/PulsarOrderedFetcherManager.java   |  22 ++--
 .../fetcher/PulsarUnorderedFetcherManager.java |  18 ++--
 .../source/reader/message/PulsarMessage.java   |  74 -
 .../reader/message/PulsarMessageCollector.java |  60 ---
 .../reader/source/PulsarOrderedSourceReader.java   |  19 ++--
 .../reader/source/PulsarSourceReaderBase.java  |  31 --
 .../reader/source/PulsarUnorderedSourceReader.java |  41 +++-
 .../split/PulsarOrderedPartitionSplitReader.java   |  13 +--
 .../split/PulsarPartitionSplitReaderBase.java  | 114 -
 .../split/PulsarUnorderedPartitionSplitReader.java |  27 ++---
 .../connector/pulsar/sink/PulsarSinkITCase.java|   6 +-
 .../writer/topic/TopicProducerRegisterTest.java|   7 +-
 .../source/enumerator/cursor/StopCursorTest.java   

[flink-connector-pulsar] 03/16: [FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit b0a1678f91dae9b7bee35f77c79b441f365499be
Author: Maximilian Michels 
AuthorDate: Thu Sep 1 16:15:33 2022 +0200

[FLINK-28853] Address PR comments / Add Kafka and Pulsar split pausing tests
---
 .../split/PulsarPartitionSplitReaderBase.java  |  9 +++---
 .../source/PulsarOrderedSourceReaderTest.java  | 32 ++
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index c1459a6..b884c57 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -162,7 +162,7 @@ abstract class PulsarPartitionSplitReaderBase
 
 List newSplits = splitsChanges.splits();
 Preconditions.checkArgument(
-newSplits.size() == 1, "This pulsar split reader only support 
one split.");
+newSplits.size() == 1, "This pulsar split reader only supports 
one split.");
 this.registeredSplit = newSplits.get(0);
 
 // Open stop cursor.
@@ -184,9 +184,10 @@ abstract class PulsarPartitionSplitReaderBase
 public void pauseOrResumeSplits(
 Collection splitsToPause,
 Collection splitsToResume) {
-if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
-throw new IllegalStateException("This pulsar split reader only 
support one split.");
-}
+// This shouldn't happen but just in case...
+Preconditions.checkState(
+splitsToPause.size() + splitsToResume.size() <= 1,
+"This pulsar split reader only supports one split.");
 
 if (!splitsToPause.isEmpty()) {
 pulsarConsumer.pause();
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
index 9806108..4ec1912 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
@@ -29,11 +29,13 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -130,6 +132,36 @@ class PulsarOrderedSourceReaderTest extends 
PulsarSourceReaderTestBase {
 }
 }
 
+@TestTemplate
+@Timeout(600)
+void supportsPausingOrResumingSplits(
+PulsarSourceReaderBase reader, Boundedness boundedness, 
String topicName)
+throws Exception {
+final PulsarPartitionSplit split =
+createPartitionSplit(topicName, 0, boundedness, 
MessageId.earliest);
+
+reader.addSplits(Collections.singletonList(split));
+
+TestingReaderOutput output = new TestingReaderOutput<>();
+
+reader.pauseOrResumeSplits(
+Collections.singletonList(split.splitId()), 
Collections.emptyList());
+
+InputStatus status = reader.pollNext(output);
+assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
+
+reader.pauseOrResumeSplits(Collections.emptyList(), 
Collections.singleton(split.splitId()));
+
+do {
+status = reader.pollNext(output);
+Thread.sleep(5);
+} while (status != InputStatus.MORE_AVAILABLE);
+
+assertThat(status).isEqualTo(InputStatus.MORE_AVAILABLE);
+
+reader.close();
+}
+
 private void setupSourceReader(
 PulsarSourceReaderBase reader,
 String topicName,



[flink-connector-pulsar] 14/16: [FLINK-26027][Connector/Pulsar] Expose Pulsar producer metrics and add FLIP-33 sink metrics. (#21249)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d8abb2a4cf2bfc692f2386e62220520680193381
Author: Yufan Sheng 
AuthorDate: Fri Nov 11 21:10:11 2022 +0800

[FLINK-26027][Connector/Pulsar] Expose Pulsar producer metrics and add 
FLIP-33 sink metrics. (#21249)
---
 .../pulsar/common/metrics/MetricNames.java | 63 
 .../common/metrics/ProducerMetricsInterceptor.java | 67 +
 .../connector/pulsar/sink/PulsarSinkOptions.java   | 14 
 .../pulsar/sink/config/SinkConfiguration.java  | 16 +++-
 .../connector/pulsar/sink/writer/PulsarWriter.java |  6 +-
 .../sink/writer/topic/TopicProducerRegister.java   | 86 +-
 .../pulsar/source/PulsarSourceOptions.java | 14 
 .../pulsar/source/config/SourceConfiguration.java  | 16 +++-
 .../source/enumerator/PulsarSourceEnumerator.java  |  8 ++
 .../source/enumerator/assigner/SplitAssigner.java  |  3 +
 .../enumerator/assigner/SplitAssignerBase.java |  5 ++
 .../source/reader/PulsarSourceReaderFactory.java   |  6 +-
 .../split/PulsarOrderedPartitionSplitReader.java   |  6 +-
 .../split/PulsarPartitionSplitReaderBase.java  | 66 -
 .../split/PulsarUnorderedPartitionSplitReader.java |  4 +-
 .../connector/pulsar/sink/PulsarSinkITCase.java|  4 +-
 .../writer/topic/TopicProducerRegisterTest.java|  7 +-
 .../source/enumerator/cursor/StopCursorTest.java   |  6 +-
 .../split/PulsarPartitionSplitReaderTestBase.java  | 12 ++-
 .../testutils/sink/PulsarSinkTestSuiteBase.java| 37 --
 20 files changed, 389 insertions(+), 57 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java
new file mode 100644
index 000..5156b47
--- /dev/null
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/MetricNames.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.metrics;
+
+/** The constant class for holding all the custom metrics names in Pulsar. */
+public final class MetricNames {
+
+private MetricNames() {
+// No public constructor.
+}
+
+public static final String PULSAR_PRODUCER_METRIC_NAME = "PulsarProducer";
+public static final String NUM_MSGS_SENT = "numMsgsSent";
+public static final String NUM_BYTES_SENT = "numBytesSent";
+public static final String NUM_SEND_FAILED = "numSendFailed";
+public static final String NUM_ACKS_RECEIVED = "numAcksReceived";
+public static final String SEND_MSGS_RATE = "sendMsgsRate";
+public static final String SEND_BYTES_RATE = "sendBytesRate";
+public static final String SEND_LATENCY_MILLIS_50_PCT = 
"sendLatencyMillis50pct";
+public static final String SEND_LATENCY_MILLIS_75_PCT = 
"sendLatencyMillis75pct";
+public static final String SEND_LATENCY_MILLIS_95_PCT = 
"sendLatencyMillis95pct";
+public static final String SEND_LATENCY_MILLIS_99_PCT = 
"sendLatencyMillis99pct";
+public static final String SEND_LATENCY_MILLIS_999_PCT = 
"sendLatencyMillis999pct";
+public static final String SEND_LATENCY_MILLIS_MAX = 
"sendLatencyMillisMax";
+public static final String TOTAL_MSGS_SENT = "totalMsgsSent";
+public static final String TOTAL_BYTES_SENT = "totalBytesSent";
+public static final String TOTAL_SEND_FAILED = "totalSendFailed";
+public static final String TOTAL_ACKS_RECEIVED = "totalAcksReceived";
+public static final String PENDING_QUEUE_SIZE = "pendingQueueSize";
+
+public static final String PULSAR_CONSUMER_METRIC_NAME = "PulsarConsumer";
+public static final String NUM_MSGS_RECEIVED = "numMsgsReceived";
+public static final String NUM_BYTES_RECEIVED = "numBytesReceived";
+public static final String RATE_MSGS_RECEIVED = "rateMsgsReceived";
+public static final String RATE_BYTES_RECEIVED = "rateBytesReceived";
+public static final String N

[flink-connector-pulsar] 01/16: [FLINK-28853][pulsar] Implement pausing Pulsar splits in case of watermark drift

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d21147e8bc3dab7d42ecd7b1fa48d2736ff7b918
Author: Dawid Wysakowicz 
AuthorDate: Fri Mar 25 09:33:33 2022 +0100

[FLINK-28853][pulsar] Implement pausing Pulsar splits in case of watermark 
drift
---
 .../source/reader/source/PulsarSourceReaderBase.java |  8 
 .../reader/split/PulsarPartitionSplitReaderBase.java | 16 
 2 files changed, 24 insertions(+)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
index 0122021..c910e94 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 
+import java.util.Collection;
+
 /**
  * The common pulsar source reader for both ordered & unordered message 
consuming.
  *
@@ -75,6 +77,12 @@ abstract class PulsarSourceReaderBase
 return splitState.toPulsarPartitionSplit();
 }
 
+@Override
+public void pauseOrResumeSplits(
+Collection splitsToPause, Collection 
splitsToResume) {
+splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
+}
+
 @Override
 public void close() throws Exception {
 // Close the all the consumers first.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index 9537969..c1459a6 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -50,6 +50,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -179,6 +180,21 @@ abstract class PulsarPartitionSplitReaderBase
 LOG.info("Register split {} consumer for current reader.", 
registeredSplit);
 }
 
+@Override
+public void pauseOrResumeSplits(
+Collection splitsToPause,
+Collection splitsToResume) {
+if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
+throw new IllegalStateException("This pulsar split reader only 
support one split.");
+}
+
+if (!splitsToPause.isEmpty()) {
+pulsarConsumer.pause();
+} else if (!splitsToResume.isEmpty()) {
+pulsarConsumer.resume();
+}
+}
+
 @Override
 public void wakeUp() {
 // Nothing to do on this method.



[flink-connector-pulsar] 04/16: [hotfix][docs]: Fix minor grammar and spelling mistakes

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 08e7b1997b209d6f484fd753d66541759de55a38
Author: Ryan Skraba 
AuthorDate: Wed Sep 1 10:27:07 2021 +0200

[hotfix][docs]: Fix minor grammar and spelling mistakes

docs: Incomplete sentence
docs: use consistent voice

chore: Connectors document review

chore: Reword SplitEnumerator

[hotfix][docs] Use consistent intro sentence

Apply grammar fixes from code review

Co-authored-by: Matthias Pohl 
---
 .../connector/pulsar/source/enumerator/cursor/StartCursor.java  | 4 ++--
 .../flink/connector/pulsar/source/enumerator/cursor/StopCursor.java | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
index 9c1d699..eae7cbf 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
@@ -28,8 +28,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import java.io.Serializable;
 
 /**
- * A interface for users to specify the start position of a pulsar 
subscription. Since it would be
- * serialized into split. The implementation for this interface should be well 
considered. I don't
+ * An interface for users to specify the start position of a pulsar 
subscription. Since it would be
+ * serialized into split, the implementation for this interface should be well 
considered. I don't
  * recommend adding extra internal state for this implementation.
  *
  * This class would be used only for {@link SubscriptionType#Exclusive} and 
{@link
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index d44c78f..866d036 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -33,9 +33,9 @@ import org.apache.pulsar.client.api.MessageId;
 import java.io.Serializable;
 
 /**
- * A interface for users to specify the stop position of a pulsar 
subscription. Since it would be
- * serialized into split. The implementation for this interface should be well 
considered. I don't
- * recommend adding extra internal state for this implementation.
+ * An interface for users to specify the stop position of a pulsar 
subscription. Since it would be
+ * serialized into split, the implementation for this interface should be well 
considered. It's not
+ * recommended to add extra internal state for this implementation.
  */
 @PublicEvolving
 @FunctionalInterface



[flink-connector-pulsar] 05/16: [FLINK-29580][Connector/Pulsar] Remove pulsar.consumer.autoUpdatePartitionsIntervalSeconds option. (#21070)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit dfb5fd86c481f09d5ac2c1abe8a2acaa0f60a862
Author: Yufan Sheng 
AuthorDate: Tue Oct 18 13:27:38 2022 +0800

[FLINK-29580][Connector/Pulsar] Remove 
pulsar.consumer.autoUpdatePartitionsIntervalSeconds option. (#21070)
---
 .../apache/flink/connector/pulsar/source/PulsarSourceOptions.java  | 7 ---
 .../connector/pulsar/source/config/PulsarSourceConfigUtils.java| 5 -
 2 files changed, 12 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 440e96b..d0ef6de 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -584,13 +584,6 @@ public final class PulsarSourceOptions {
 .defaultValue(false)
 .withDescription("If enabled, the consumer will 
automatically retry messages.");
 
-public static final ConfigOption 
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS =
-ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"autoUpdatePartitionsIntervalSeconds")
-.intType()
-.defaultValue(60)
-.withDescription(
-"The interval (in seconds) of updating partitions. 
This only works if autoUpdatePartitions is enabled.");
-
 public static final ConfigOption 
PULSAR_REPLICATE_SUBSCRIPTION_STATE =
 ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"replicateSubscriptionState")
 .booleanType()
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 063d588..d76ba0a 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
@@ -44,7 +43,6 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACK_TIMEOUT_MILLIS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL;
-import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_PROPERTIES;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
@@ -121,9 +119,6 @@ public final class PulsarSourceConfigUtils {
 configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
 configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
 
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
-configuration.useOption(
-PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
-v -> builder.autoUpdatePartitionsInterval(v, SECONDS));
 configuration.useOption(PULSAR_RETRY_ENABLE, builder::enableRetry);
 configuration.useOption(
 PULSAR_MAX_PENDING_CHUNKED_MESSAGE, 
builder::maxPendingChunkedMessage);



[flink-connector-pulsar] 09/16: [FLINK-29624][Common][Connector][Filesystem] Upgrade org.apache.commons:commons-lang3 from 3.3.2 to 3.12.0

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 1c886323ad9edf985efbf04e602ed524d6fa6beb
Author: Martijn Visser 
AuthorDate: Thu Oct 13 11:14:49 2022 +0200

[FLINK-29624][Common][Connector][Filesystem] Upgrade 
org.apache.commons:commons-lang3 from 3.3.2 to 3.12.0

* Remove Pulsar specific version of `commons-lang3` since Flink now uses a 
newer version
* Replace usage of deprecated method 
`org.apache.commons.lang3.ArrayUtils#add` with 
`org.apache.commons.lang3.ArrayUtils#insert`
* Replace usage of `org.apache.commons.lang3.StringEscapeUtils` with 
`org.apache.commons.text.StringEscapeUtils`
---
 flink-connector-pulsar/pom.xml | 10 --
 1 file changed, 10 deletions(-)

diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml
index 30c0a1b..3118a9a 100644
--- a/flink-connector-pulsar/pom.xml
+++ b/flink-connector-pulsar/pom.xml
@@ -41,7 +41,6 @@ under the License.


0.6.1
1.7.0
-   
3.11
4.1.77.Final
1.45.1

@@ -137,15 +136,6 @@ under the License.
test

 
-   
-   
-   
-   org.apache.commons
-   commons-lang3
-   ${pulsar-commons-lang3.version}
-   test
-   
-


 



[flink-connector-pulsar] 11/16: [FLINK-28083][Connector/Pulsar] PulsarSource work with object-reusing DeserializationSchema. (#21205)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d6e2e06964c4cf40a5e58d4bc2f976a65b4a566b
Author: Yufan Sheng 
AuthorDate: Tue Nov 8 14:53:22 2022 +0800

[FLINK-28083][Connector/Pulsar] PulsarSource work with object-reusing 
DeserializationSchema. (#21205)
---
 .../source/reader/PulsarSourceReaderFactory.java   | 23 +++
 .../source/reader/emitter/PulsarRecordEmitter.java | 63 --
 .../reader/fetcher/PulsarFetcherManagerBase.java   | 27 
 .../fetcher/PulsarOrderedFetcherManager.java   | 18 +++---
 .../fetcher/PulsarUnorderedFetcherManager.java | 14 ++--
 .../source/reader/message/PulsarMessage.java   | 74 --
 .../reader/message/PulsarMessageCollector.java | 60 --
 .../reader/source/PulsarOrderedSourceReader.java   | 15 +++--
 .../reader/source/PulsarSourceReaderBase.java  | 18 ++
 .../reader/source/PulsarUnorderedSourceReader.java | 13 ++--
 .../split/PulsarOrderedPartitionSplitReader.java   | 13 +---
 .../split/PulsarPartitionSplitReaderBase.java  | 33 +++---
 .../split/PulsarUnorderedPartitionSplitReader.java | 11 +---
 .../source/enumerator/cursor/StopCursorTest.java   | 17 ++---
 .../PulsarOrderedPartitionSplitReaderTest.java |  8 +--
 .../split/PulsarPartitionSplitReaderTestBase.java  | 74 ++
 16 files changed, 175 insertions(+), 306 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
index 820888a..2e83ab5 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import 
org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
@@ -33,6 +33,7 @@ import 
org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPart
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
@@ -70,25 +71,25 @@ public final class PulsarSourceReaderFactory {
 
 // Create a message queue with the predefined source option.
 int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
-FutureCompletingBlockingQueue>> 
elementsQueue =
+FutureCompletingBlockingQueue>> 
elementsQueue =
 new FutureCompletingBlockingQueue<>(queueCapacity);
 
+PulsarRecordEmitter recordEmitter = new 
PulsarRecordEmitter<>(deserializationSchema);
+
 // Create different pulsar source reader by subscription type.
 SubscriptionType subscriptionType = 
sourceConfiguration.getSubscriptionType();
 if (subscriptionType == SubscriptionType.Failover
 || subscriptionType == SubscriptionType.Exclusive) {
 // Create an ordered split reader supplier.
-Supplier> 
splitReaderSupplier =
+Supplier splitReaderSupplier =
 () ->
-new PulsarOrderedPartitionSplitReader<>(
-pulsarClient,
-pulsarAdmin,
-sourceConfiguration,
-deserializationSchema);
+new PulsarOrderedPartitionSplitReader(
+pulsarClient, pulsarAdmin, 
sourceConfiguration);
 
 return new PulsarOrderedSourceReader<>(
 elementsQueue,
 splitReaderSupplier,
+recordEmitter,
 readerContext,
 sourceConfigu

[flink-connector-pulsar] 15/16: [FLINK-30254][Connector/Pulsar] Sync Pulsar updates and set Flink version to 1.17-SNAPSHOT

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit d330e403dfe6e2b1cb5051c93d1ff77d103746a3
Author: Martijn Visser 
AuthorDate: Wed Nov 30 20:03:59 2022 +0100

[FLINK-30254][Connector/Pulsar] Sync Pulsar updates and set Flink version 
to 1.17-SNAPSHOT
---
 .github/workflows/push_pr.yml | 6 +++---
 .github/workflows/weekly.yml  | 2 +-
 pom.xml   | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index be18918..759de4b 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -22,6 +22,6 @@ jobs:
   compile_and_test:
 uses: ./.github/workflows/ci.yml
 with:
-  flink_version: 1.16.0
-  flink_url: 
https://dist.apache.org/repos/dist/release/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz
-  cache_flink_binary: true
\ No newline at end of file
+  flink_version: 1.17-SNAPSHOT
+  flink_url: 
https://s3.amazonaws.com/flink-nightly/flink-1.17-SNAPSHOT-bin-scala_2.12.tgz
+  cache_flink_binary: false
\ No newline at end of file
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index 28aed6b..cb628f2 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -24,7 +24,7 @@ jobs:
   compile_and_test:
 strategy:
   matrix:
-flink: [1.16-SNAPSHOT, 1.17-SNAPSHOT]
+flink: [1.17-SNAPSHOT]
 uses: ./.github/workflows/ci.yml
 with:
   flink_version: ${{ matrix.flink }}
diff --git a/pom.xml b/pom.xml
index 2fca0a9..9c5040e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@ under the License.
 
 
 
-1.16.0
+1.17-SNAPSHOT
 
 2.13.4.20221013
 1.45.1



[flink-connector-pulsar] 06/16: [FLINK-29433][Connector/Pulsar] Support Auth through the builder pattern in Pulsar connector. (#21071)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 618daa266d483f4b0d3af03ffbb47315e872df3a
Author: Yufan Sheng 
AuthorDate: Tue Oct 18 13:29:39 2022 +0800

[FLINK-29433][Connector/Pulsar] Support Auth through the builder pattern in 
Pulsar connector. (#21071)
---
 .../connector/pulsar/sink/PulsarSinkBuilder.java   | 33 ++
 .../pulsar/source/PulsarSourceBuilder.java | 33 ++
 2 files changed, 66 insertions(+)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index 332a27a..4e8f9b2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -38,9 +38,13 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
@@ -243,6 +247,35 @@ public class PulsarSinkBuilder {
 return this;
 }
 
+/**
+ * Configure the authentication provider to use in the Pulsar client 
instance.
+ *
+ * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+ * @param authParamsString string which represents parameters for the 
Authentication-Plugin,
+ * e.g., "key1:val1,key2:val2"
+ * @return this PulsarSinkBuilder.
+ */
+public PulsarSinkBuilder setAuthentication(
+String authPluginClassName, String authParamsString) {
+configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
+return this;
+}
+
+/**
+ * Configure the authentication provider to use in the Pulsar client 
instance.
+ *
+ * @param authPluginClassName name of the Authentication-Plugin you want 
to use
+ * @param authParams map which represents parameters for the 
Authentication-Plugin
+ * @return this PulsarSinkBuilder.
+ */
+public PulsarSinkBuilder setAuthentication(
+String authPluginClassName, Map authParams) {
+configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
+configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
+return this;
+}
+
 /**
  * Set an arbitrary property for the PulsarSink and Pulsar Producer. The 
valid keys can be found
  * in {@link PulsarSinkOptions} and {@link PulsarOptions}.
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 5309dd0..5aac3d2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -44,11 +44,15 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
 import static java.lang.Boolean.FALSE;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
@@ -369,6 +373,35 @@ public final class PulsarSourceBuilder {
 return self;
 }
 
+/**
+ * Configu

[flink-connector-pulsar] 07/16: [FLINK-28085][Connector/Pulsar] Close the pending Pulsar transactions before closing pipeline (#21075)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 62da02aec6c62067bacaef932fff496553cc5c60
Author: Yufan Sheng 
AuthorDate: Tue Oct 18 20:23:24 2022 +0800

[FLINK-28085][Connector/Pulsar] Close the pending Pulsar transactions 
before closing pipeline (#21075)
---
 .../common/utils/PulsarTransactionUtils.java   |  2 ++
 .../reader/fetcher/PulsarFetcherManagerBase.java   | 12 ++
 .../reader/source/PulsarOrderedSourceReader.java   |  3 +++
 .../reader/source/PulsarSourceReaderBase.java  |  7 ++
 .../reader/source/PulsarUnorderedSourceReader.java | 27 ++
 .../split/PulsarUnorderedPartitionSplitReader.java | 14 +++
 6 files changed, 61 insertions(+), 4 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
index a48b4d4..c8ffe63 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
@@ -60,6 +60,8 @@ public final class PulsarTransactionUtils {
 /**
  * This is a bug in original {@link 
TransactionCoordinatorClientException#unwrap(Throwable)}
  * method. Pulsar wraps the {@link ExecutionException} which hides the 
real execution exception.
+ *
+ * This bug should be fixed after the 2.10.0 release. We just keep this 
for safety.
  */
 public static TransactionCoordinatorClientException unwrap(
 TransactionCoordinatorClientException e) {
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
index 6abb8bd..1622eee 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
@@ -86,6 +86,18 @@ public abstract class PulsarFetcherManagerBase
 }
 }
 
+/** Close the finished split related fetcher. */
+public void closeFetcher(String splitId) {
+Integer fetchId = splitFetcherMapping.remove(splitId);
+if (fetchId != null) {
+fetcherStatus.remove(fetchId);
+SplitFetcher, PulsarPartitionSplit> fetcher = 
fetchers.remove(fetchId);
+if (fetcher != null) {
+fetcher.shutdown();
+}
+}
+}
+
 protected SplitFetcher, PulsarPartitionSplit> 
getOrCreateFetcher(
 String splitId) {
 SplitFetcher, PulsarPartitionSplit> fetcher;
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
index a06d8a4..6ff0466 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
@@ -109,6 +109,9 @@ public class PulsarOrderedSourceReader extends 
PulsarSourceReaderBase
 
 @Override
 protected void onSplitFinished(Map 
finishedSplitIds) {
+// Close all the finished splits.
+closeFinishedSplits(finishedSplitIds.keySet());
+
 // We don't require new splits, all the splits are pre-assigned by 
source enumerator.
 if (LOG.isDebugEnabled()) {
 LOG.debug("onSplitFinished event: {}", finishedSplitIds);
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
index c910e94..eee6950 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 
 import java.util.Collection;
+import java.util.Set;
 
 /**
  * The common pulsar source reader for both ordered & unordered message 
consuming.
@@ -92,4 +93,10 @@ abstract class PulsarSource

[flink-connector-pulsar] 10/16: [FLINK-29709][Connector/Pulsar] Bump the Pulsar to latest 2.10.2 (#21204)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 4f204e8091fc870c0928b3834de691dc8b2c4dfb
Author: Yufan Sheng 
AuthorDate: Mon Nov 7 11:44:15 2022 +0800

[FLINK-29709][Connector/Pulsar] Bump the Pulsar to latest 2.10.2 (#21204)
---
 flink-connector-pulsar/pom.xml|  9 -
 .../connector/pulsar/common/schema/PulsarSchema.java  | 15 ++-
 .../connector/pulsar/common/schema/PulsarSchemaUtils.java |  9 +++--
 .../src/main/resources/META-INF/NOTICE|  8 
 4 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/flink-connector-pulsar/pom.xml b/flink-connector-pulsar/pom.xml
index 3118a9a..5881495 100644
--- a/flink-connector-pulsar/pom.xml
+++ b/flink-connector-pulsar/pom.xml
@@ -35,7 +35,7 @@ under the License.
jar
 

-   2.10.1
+   2.10.2
3.21.7
 

@@ -43,6 +43,7 @@ under the License.
1.7.0
4.1.77.Final
1.45.1
+   2.9.1

 

@@ -135,6 +136,12 @@ under the License.
${pulsar.version}
test

+   
+   com.github.ben-manes.caffeine
+   caffeine
+   ${pulsar-caffeine.version}
+   test
+   
 


diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
index bb09315..fe5daca 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
@@ -155,6 +155,9 @@ public final class PulsarSchema implements Serializable {
 oos.writeUTF(entry.getKey());
 oos.writeUTF(entry.getValue());
 }
+
+// Timestamp
+oos.writeLong(schemaInfo.getTimestamp());
 }
 
 private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
@@ -177,7 +180,17 @@ public final class PulsarSchema implements Serializable 
{
 properties.put(ois.readUTF(), ois.readUTF());
 }
 
-this.schemaInfo = new SchemaInfoImpl(name, schemaBytes, type, 
properties);
+// Timestamp
+long timestamp = ois.readLong();
+
+this.schemaInfo =
+SchemaInfoImpl.builder()
+.name(name)
+.schema(schemaBytes)
+.type(type)
+.properties(properties)
+.timestamp(timestamp)
+.build();
 this.schema = createSchema(schemaInfo);
 }
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
index 4b1f7ee..00bf2cf 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
@@ -181,8 +181,13 @@ public final class PulsarSchemaUtils {
 Map properties = new 
HashMap<>(schemaInfo.getProperties());
 properties.put(CLASS_INFO_PLACEHOLDER, typeClass.getName());
 
-return new SchemaInfoImpl(
-schemaInfo.getName(), schemaInfo.getSchema(), 
schemaInfo.getType(), properties);
+return SchemaInfoImpl.builder()
+.name(schemaInfo.getName())
+.schema(schemaInfo.getSchema())
+.type(schemaInfo.getType())
+.properties(properties)
+.timestamp(schemaInfo.getTimestamp())
+.build();
 }
 
 @SuppressWarnings("unchecked")
diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index 151da33..1aee6f4 100644
--- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -6,10 +6,10 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.pulsar:bouncy-castle-bc:pkg:2.10.1
-- org.apache.pulsar:pulsar-client-admin-api:2.10.1
-- org.apache.pulsar:pulsar-client-all:2.10.1
-- org.apache.pulsar:pulsar-client-api:2.10.1
+- or

[flink-connector-pulsar] 12/16: [FLINK-29860][Connectors/Pulsar] InlineElement fields should be transient (#21262)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit f6ead4c4ef3c195381c7f5090764fbda881ebcc4
Author: noelo 
AuthorDate: Thu Nov 10 12:27:55 2022 +0100

[FLINK-29860][Connectors/Pulsar] InlineElement fields should be transient  
(#21262)
---
 .../flink/connector/pulsar/sink/writer/router/MessageKeyHash.java   | 2 +-
 .../flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java | 2 +-
 .../apache/flink/connector/pulsar/source/config/CursorVerification.java | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
index bbac99e..3ad092d 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/MessageKeyHash.java
@@ -62,7 +62,7 @@ public enum MessageKeyHash implements DescribedEnum {
 };
 
 private final String name;
-private final InlineElement desc;
+private final transient InlineElement desc;
 
 MessageKeyHash(String name, InlineElement desc) {
 this.name = name;
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
index c327435..f251f8f 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/router/TopicRoutingMode.java
@@ -67,7 +67,7 @@ public enum TopicRoutingMode implements DescribedEnum {
 code(TopicRouter.class.getSimpleName(;
 
 private final String name;
-private final InlineElement desc;
+private final transient InlineElement desc;
 
 TopicRoutingMode(String name, InlineElement desc) {
 this.name = name;
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/CursorVerification.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/CursorVerification.java
index fc70bdd..d604ceb 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/CursorVerification.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/CursorVerification.java
@@ -35,7 +35,7 @@ public enum CursorVerification implements DescribedEnum {
 /** Print a warn message and start consuming from the valid offset. */
 WARN_ON_MISMATCH(text("Print a warn message and start consuming from the 
valid offset."));
 
-private final InlineElement desc;
+private final transient InlineElement desc;
 
 CursorVerification(InlineElement desc) {
 this.desc = desc;



[flink-connector-pulsar] 02/16: [FLINK-28853][connector-base] Add allow-unaligned-source-splits config option

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 9cce62760a86b3dc3c4c8aef45d7e1444a436914
Author: Sebastian Mattheis 
AuthorDate: Sun Jul 24 12:09:48 2022 +0200

[FLINK-28853][connector-base] Add allow-unaligned-source-splits config 
option
---
 .../pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java  | 6 --
 .../pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java   | 6 --
 .../pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java | 6 --
 .../pulsar/source/reader/source/PulsarOrderedSourceReader.java  | 3 ++-
 .../pulsar/source/reader/source/PulsarUnorderedSourceReader.java| 3 ++-
 5 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
index 1f9a35f..6abb8bd 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.source.reader.fetcher;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
@@ -57,8 +58,9 @@ public abstract class PulsarFetcherManagerBase
  */
 protected PulsarFetcherManagerBase(
 
FutureCompletingBlockingQueue>> 
elementsQueue,
-Supplier, PulsarPartitionSplit>> 
splitReaderSupplier) {
-super(elementsQueue, splitReaderSupplier);
+Supplier, PulsarPartitionSplit>> 
splitReaderSupplier,
+Configuration configuration) {
+super(elementsQueue, splitReaderSupplier, configuration);
 }
 
 /**
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
index f8b89ee..103dc62 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.source.reader.fetcher;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -49,8 +50,9 @@ public class PulsarOrderedFetcherManager extends 
PulsarFetcherManagerBase
 
 public PulsarOrderedFetcherManager(
 
FutureCompletingBlockingQueue>> 
elementsQueue,
-Supplier, PulsarPartitionSplit>> 
splitReaderSupplier) {
-super(elementsQueue, splitReaderSupplier);
+Supplier, PulsarPartitionSplit>> 
splitReaderSupplier,
+Configuration configuration) {
+super(elementsQueue, splitReaderSupplier, configuration);
 }
 
 public void acknowledgeMessages(Map 
cursorsToCommit) {
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
index 1523b9a..449e992 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.source.reader.fetcher;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -49,8 +50,9 @@ public class PulsarUnorderedFetcherManager extends 
PulsarFetcherManagerBase>> 
elementsQueue,
-Supplier, PulsarPartitionSpl

[flink-connector-pulsar] 08/16: [FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink (#21074)

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit c6a20a66bf22383c114a6ae23f70413d6736aa51
Author: Yufan Sheng 
AuthorDate: Tue Oct 18 21:32:37 2022 +0800

[FLINK-28820][Connector/Pulsar] Improve the writing performance for 
PulsarSink (#21074)
---
 .../connector/pulsar/sink/PulsarSinkOptions.java   |  2 +
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  |  4 --
 .../pulsar/sink/config/SinkConfiguration.java  | 13 
 .../connector/pulsar/sink/writer/PulsarWriter.java | 82 +-
 4 files changed, 35 insertions(+), 66 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index dafb8aa..0433bb0 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -127,6 +127,8 @@ public final class PulsarSinkOptions {
 "The allowed transaction recommit times if we meet 
some retryable exception."
 + " This is used in Pulsar Transaction.");
 
+/** @deprecated This config option was removed for better performance. */
+@Deprecated
 public static final ConfigOption 
PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
 ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
 .intType()
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 5f935a4..61cfd5a 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -84,10 +84,6 @@ public final class PulsarSinkConfigUtils {
 PULSAR_SEND_TIMEOUT_MS,
 Math::toIntExact,
 ms -> builder.sendTimeout(ms, MILLISECONDS));
-configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, 
builder::maxPendingMessages);
-configuration.useOption(
-PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
-builder::maxPendingMessagesAcrossPartitions);
 configuration.useOption(
 PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
 s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index 768b730..d6a6ee3 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.client.api.Schema;
 import java.util.Objects;
 
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
-import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
@@ -51,7 +50,6 @@ public class SinkConfiguration extends PulsarConfiguration {
 private final int partitionSwitchSize;
 private final MessageKeyHash messageKeyHash;
 private final boolean enableSchemaEvolution;
-private final int maxPendingMessages;
 private final int maxRecommitTimes;
 
 public SinkConfiguration(Configuration configuration) {
@@ -63,7 +61,6 @@ public class SinkConfiguration extends PulsarConfiguration {
 this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
 this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
 this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
-this.maxPendingMessages = 
get(PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM);
 this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
 }
 
@@ -111,14 +108,6 @@ public class SinkConfiguration extends PulsarConfiguration 
{
 return enableSchemaEvolution;
 }
 
-/**
- * Pulsar message is sent asynchronously. Set this option for limiting the 
pending messages 

[flink-connector-pulsar] 16/16: [FLINK-30254][Connector/Pulsar] Resolve dependency convergence errors

2022-12-06 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 319992293faaf1c678b37904c3996c990f6e279d
Author: Martijn Visser 
AuthorDate: Wed Nov 30 20:11:23 2022 +0100

[FLINK-30254][Connector/Pulsar] Resolve dependency convergence errors
---
 pom.xml | 11 +--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9c5040e..64472a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,10 +54,10 @@ under the License.
 1.45.1
 1.69
 1.4.0
-9.4.44.v20210927
+9.4.48.v20220622
 0.8.1
 4.13.2
-5.8.1
+5.9.1
 3.23.1
 0.22.0
 1.17.2
@@ -315,6 +315,13 @@ under the License.
 ${junit5.version}
 
 
+
+
+org.junit.platform
+junit-platform-commons
+1.9.1
+
+
 
 
 org.checkerframework



[flink-kubernetes-operator] branch main updated (2c207edf -> 41b9f4fc)

2022-12-06 Thread mbalassi
This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


from 2c207edf [FLINK-30154] Allow to set flinkConfiguration in SessionJob
 add 41b9f4fc [FLINK-30280] Document logging configuration behaviour

No new revisions were added by this update.

Summary of changes:
 helm/flink-kubernetes-operator/values.yaml | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)



[flink-kubernetes-operator] branch main updated: [FLINK-30287] Delete job manager deployment first when cleaning up standalone cluster

2022-12-06 Thread morhidi
This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new f3db414f [FLINK-30287] Delete job manager deployment first when 
cleaning up standalone cluster
f3db414f is described below

commit f3db414ff2673d69aa113d63c996912fb9d57bc4
Author: Steven Zhang 
AuthorDate: Fri Dec 2 11:28:23 2022 -0800

[FLINK-30287] Delete job manager deployment first when cleaning up 
standalone cluster
---
 .../flink/kubernetes/operator/service/StandaloneFlinkService.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index a9a5c21f..ad3e190e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -134,20 +134,20 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
 final String clusterId = meta.getName();
 final String namespace = meta.getNamespace();
 
-LOG.info("Deleting Flink Standalone cluster TM resources");
+LOG.info("Deleting Flink Standalone cluster JM resources");
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
-
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+
.withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
 .delete();
 
-LOG.info("Deleting Flink Standalone cluster JM resources");
+LOG.info("Deleting Flink Standalone cluster TM resources");
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
-
.withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
+
.withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
 .delete();
 
 if (deleteHaConfigmaps) {



[flink-kubernetes-operator] branch main updated: [FLINK-30046] Support for operator docker image digest

2022-12-06 Thread morhidi
This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 03654167 [FLINK-30046] Support for operator docker image digest
03654167 is described below

commit 0365416773390ae7a4946ba7180abb9e66b0dd86
Author: Marton Balassi 
AuthorDate: Tue Dec 6 16:25:07 2022 +0100

[FLINK-30046] Support for operator docker image digest
---
 docs/content/docs/operations/helm.md |  1 +
 helm/flink-kubernetes-operator/templates/_helpers.tpl| 11 +++
 helm/flink-kubernetes-operator/templates/flink-operator.yaml |  4 ++--
 helm/flink-kubernetes-operator/values.yaml   |  2 ++
 4 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/operations/helm.md 
b/docs/content/docs/operations/helm.md
index 1aaca026..3c5969ba 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -63,6 +63,7 @@ The configurable parameters of the Helm chart and which 
default values as detail
 | image.repository | The image repository of flink-kubernetes-operator. | 
ghcr.io/apache/flink-kubernetes-operator |
 | image.pullPolicy | The image pull policy of flink-kubernetes-operator. | 
IfNotPresent |
 | image.tag | The image tag of flink-kubernetes-operator. | latest |
+| image.digest | The image tag of flink-kubernetes-operator. If set then it 
takes precedence and the image tag will be ignored. | |
 | replicas | Operator replica count. Must be 1 unless leader election is 
configured. | 1 |
 | strategy.type | Operator pod upgrade strategy. Must be Recreate unless 
leader election is configured. | Recreate |
 | rbac.create | Whether to enable RBAC to create for said namespaces. | true |
diff --git a/helm/flink-kubernetes-operator/templates/_helpers.tpl 
b/helm/flink-kubernetes-operator/templates/_helpers.tpl
index aa53cb3d..3456ab46 100644
--- a/helm/flink-kubernetes-operator/templates/_helpers.tpl
+++ b/helm/flink-kubernetes-operator/templates/_helpers.tpl
@@ -67,6 +67,17 @@ Selector labels
 app.kubernetes.io/name: {{ include "flink-operator.name" . }}
 {{- end }}
 
+{{/*
+Create the path of the operator image to use
+*/}}
+{{- define "flink-operator.imagePath" -}}
+{{- if .Values.image.digest }}
+{{- .Values.image.repository }}@{{ .Values.image.digest }}
+{{- else }}
+{{- .Values.image.repository }}:{{ default .Chart.AppVersion .Values.image.tag 
}}
+{{- end }}
+{{- end }}
+
 {{/*
 Create the name of the operator role to use
 */}}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml 
b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index e8caa574..f94c2198 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -63,7 +63,7 @@ spec:
   serviceAccountName: {{ include "flink-operator.serviceAccountName" . }}
   containers:
 - name: {{ .Chart.Name }}
-  image: "{{ .Values.image.repository }}:{{ .Values.image.tag | 
default .Chart.AppVersion }}"
+  image: {{ include "flink-operator.imagePath" . }}
   imagePullPolicy: {{ .Values.image.pullPolicy }}
   command: ["/docker-entrypoint.sh", "operator"]
   ports:
@@ -127,7 +127,7 @@ spec:
   {{- end }}
 {{- if eq (include "flink-operator.webhook-enabled" .) "true" }}
 - name: flink-webhook
-  image: "{{ .Values.image.repository }}:{{ .Values.image.tag | 
default .Chart.AppVersion }}"
+  image: {{ include "flink-operator.imagePath" . }}
   imagePullPolicy: {{ .Values.image.pullPolicy }}
   command: ["/docker-entrypoint.sh", "webhook"]
   env:
diff --git a/helm/flink-kubernetes-operator/values.yaml 
b/helm/flink-kubernetes-operator/values.yaml
index e55ba519..32517cc4 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -26,6 +26,8 @@ image:
   repository: flink-kubernetes-operator
   pullPolicy: IfNotPresent
   tag: latest
+  # If image digest is set then it takes precedence and the image tag will be 
ignored
+  # digest: ""
 
 imagePullSecrets: []
 



[flink-kubernetes-operator] branch main updated: [FLINK-30287] add unit test

2022-12-06 Thread morhidi
This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new ca8c5112 [FLINK-30287] add unit test
ca8c5112 is described below

commit ca8c5112af4173c68caa91f6470bac360fc0374f
Author: Steven Zhang 
AuthorDate: Tue Dec 6 15:15:26 2022 -0800

[FLINK-30287] add unit test
---
 .../flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index ff68c6fc..b406220f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -77,9 +77,13 @@ public class StandaloneFlinkServiceTest {
 
 assertEquals(2, deployments.size());
 
+var requestsBeforeDelete = mockServer.getRequestCount();
 flinkStandaloneService.deleteClusterDeployment(
 flinkDeployment.getMetadata(), flinkDeployment.getStatus(), 
false);
 
+assertEquals(2, mockServer.getRequestCount() - requestsBeforeDelete);
+
assertTrue(mockServer.getLastRequest().getPath().contains("taskmanager"));
+
 deployments = kubernetesClient.apps().deployments().list().getItems();
 
 assertEquals(0, deployments.size());



[flink-kubernetes-operator] branch main updated: Add OLM bundle generator

2022-12-06 Thread morhidi
This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new 66953e01 Add OLM bundle generator
66953e01 is described below

commit 66953e01dcfa9ed1b31a7b6b854373cce477a8db
Author: ted chang 
AuthorDate: Wed Nov 30 13:37:13 2022 -0800

Add OLM bundle generator

Signed-off-by: ted chang 
---
 tools/olm/AutomatedOLMBuildAndDeploy.sh|  86 
 tools/olm/README.md| 144 ++
 ...-kubernetes-operator.clusterserviceversion.yaml | 216 +
 tools/olm/csv-template/bases/initContainer.yaml|  40 
 tools/olm/csv-template/bundle.Dockerfile   |  37 
 tools/olm/csv-template/catalog.Dockerfile  |  33 
 tools/olm/docker-entry.sh  | 114 +++
 tools/olm/env.sh   |  42 
 tools/olm/generate-olm-bundle.sh   | 153 +++
 tools/olm/install-prereq.sh|  97 +
 tools/olm/utils.Dockerfile |  43 
 11 files changed, 1005 insertions(+)

diff --git a/tools/olm/AutomatedOLMBuildAndDeploy.sh 
b/tools/olm/AutomatedOLMBuildAndDeploy.sh
new file mode 100755
index ..359388a5
--- /dev/null
+++ b/tools/olm/AutomatedOLMBuildAndDeploy.sh
@@ -0,0 +1,86 @@
+#!/bin/bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+set +x
+# Script that Automates the OLM Creation and test of Flink Operator
+
+# Run the generate-olm-bundle.sh script
+./generate-olm-bundle.sh
+
+# Reset Cluster
+kind delete cluster
+kind create cluster
+kubectl cluster-info --context kind-kind
+# operator-sdk olm install
+curl -sL 
https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.22.0/install.sh
 | bash -s v0.22.0
+
+# Deploy the catalog src
+cat 

[flink-table-store] branch master updated: [FLINK-30288] Use PredicateVisitor to convert predicate for orc

2022-12-06 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new f32ce6e9 [FLINK-30288] Use PredicateVisitor to convert predicate for 
orc
f32ce6e9 is described below

commit f32ce6e99f9ced265abbd57ce7af4c2e184c20cf
Author: shammon 
AuthorDate: Wed Dec 7 11:32:11 2022 +0800

[FLINK-30288] Use PredicateVisitor to convert predicate for orc

This closes #419
---
 .../table/store/format/orc/OrcFileFormat.java  |   7 +-
 .../table/store/format/orc/OrcFilterConverter.java | 246 -
 .../format/orc/OrcPredicateFunctionVisitor.java| 215 ++
 .../store/format/orc/OrcFilterConverterTest.java   |   4 +-
 4 files changed, 220 insertions(+), 252 deletions(-)

diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
index 663260ab..2c68889f 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFileFormat.java
@@ -87,10 +87,9 @@ public class OrcFileFormat extends FileFormat {
 
 if (filters != null) {
 for (Predicate pred : filters) {
-OrcFilters.Predicate orcPred = 
OrcFilterConverter.toOrcPredicate(pred);
-if (orcPred != null) {
-orcPredicates.add(orcPred);
-}
+Optional orcPred =
+pred.visit(OrcPredicateFunctionVisitor.VISITOR);
+orcPred.ifPresent(orcPredicates::add);
 }
 }
 
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
deleted file mode 100644
index acccd161..
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/OrcFilterConverter.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.format.orc;
-
-import org.apache.flink.orc.OrcFilters;
-import org.apache.flink.orc.OrcFilters.Predicate;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
-import org.apache.flink.table.store.file.predicate.Equal;
-import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
-import org.apache.flink.table.store.file.predicate.GreaterThan;
-import org.apache.flink.table.store.file.predicate.IsNotNull;
-import org.apache.flink.table.store.file.predicate.IsNull;
-import org.apache.flink.table.store.file.predicate.LeafFunction;
-import org.apache.flink.table.store.file.predicate.LeafPredicate;
-import org.apache.flink.table.store.file.predicate.LessOrEqual;
-import org.apache.flink.table.store.file.predicate.LessThan;
-import org.apache.flink.table.store.file.predicate.NotEqual;
-import org.apache.flink.table.store.file.predicate.Or;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.util.function.TriFunction;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.sql.Date;
-import java.time.LocalDate;
-import java.util.function.Function;
-
-/** Utility class that provides helper methods to work with Orc Filter 
PushDown. */
-public class OrcFilterConverter {
-
-private static final ImmutableMap<
-Class, Function>
-FILTERS =
-new ImmutableMap.Builder<
-Class,
-Function>()
-.put(IsNull.class, 
OrcFilterConverter::

[flink] branch master updated: [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 4c67f8fca52 [FLINK-30189][runtime] HsSubpartitionFileReader may load 
data that has been consumed from memory
4c67f8fca52 is described below

commit 4c67f8fca529a72389d69990307bbf78fcd3d99d
Author: Weijie Guo 
AuthorDate: Tue Dec 6 19:24:36 2022 +0800

[FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been 
consumed from memory

This closes #21415
---
 .../hybrid/HsSubpartitionFileReaderImpl.java   | 25 ++-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   | 48 --
 2 files changed, 50 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index e40e917cfa1..c190690d45c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -265,19 +265,24 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
 private Optional 
checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
 throws Throwable {
-if (loadedBuffers.isEmpty()) {
-return Optional.empty();
-}
-
 BufferIndexOrError peek = loadedBuffers.peek();
-
-if (peek.getThrowable().isPresent()) {
-throw peek.getThrowable().get();
-} else if (peek.getIndex() != expectedBufferIndex) {
-return Optional.empty();
+while (peek != null) {
+if (peek.getThrowable().isPresent()) {
+throw peek.getThrowable().get();
+} else if (peek.getIndex() == expectedBufferIndex) {
+break;
+} else if (peek.getIndex() > expectedBufferIndex) {
+return Optional.empty();
+} else if (peek.getIndex() < expectedBufferIndex) {
+// Because the update of consumption progress may be delayed, 
there is a
+// very small probability to load the buffer that has been 
consumed from memory.
+// Skip these buffers directly to avoid repeated consumption.
+loadedBuffers.poll();
+peek = loadedBuffers.peek();
+}
 }
 
-return Optional.of(peek);
+return Optional.ofNullable(peek);
 }
 
 private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index cccf9f5d93e..b8f5d206a78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -398,22 +398,20 @@ class HsSubpartitionFileReaderImplTest {
 // if no preload data in file reader, return Optional.empty.
 assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
 
-// buffers in file: (0-0, 0-1)
-writeDataToFile(0, 0, 0, 2);
+// buffers in file: (0-0, 0-1, 0-2)
+writeDataToFile(0, 0, 0, 3);
 
-Queue memorySegments = createsMemorySegments(2);
+Queue memorySegments = createsMemorySegments(3);
 subpartitionFileReader.prepareForScheduling();
 // trigger reading, add buffer to queue.
 subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
 
-// if nextBufferToConsume is not equal to peek elements index, return 
Optional.empty.
-assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
-
+// if nextBufferToConsume is equal to peek elements index.
 assertThat(subpartitionFileReader.consumeBuffer(0))
 .hasValueSatisfying(
 (bufferAndBacklog -> {
 assertThat(bufferAndBacklog.getNextDataType())
-.isEqualTo(DataType.EVENT_BUFFER);
+.isEqualTo(DataType.DATA_BUFFER);
 
assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
 // first buffer's data is 0.
 assertThat(
@@ -424,6 +422,26 @@ class HsSubpartitionFileReaderImplTest {

[flink] branch release-1.16 updated: [FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been consumed from memory

2022-12-06 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
 new a9e65bc2377 [FLINK-30189][runtime] HsSubpartitionFileReader may load 
data that has been consumed from memory
a9e65bc2377 is described below

commit a9e65bc2377ee7a4b3599b58a58ff0301b79c5d8
Author: Weijie Guo 
AuthorDate: Tue Dec 6 19:24:36 2022 +0800

[FLINK-30189][runtime] HsSubpartitionFileReader may load data that has been 
consumed from memory

This closes #21415
---
 .../hybrid/HsSubpartitionFileReaderImpl.java   | 25 ++-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   | 48 --
 2 files changed, 50 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index e6dc7122c5e..88d730b14ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -261,19 +261,24 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
 private Optional 
checkAndGetFirstBufferIndexOrError(int expectedBufferIndex)
 throws Throwable {
-if (loadedBuffers.isEmpty()) {
-return Optional.empty();
-}
-
 BufferIndexOrError peek = loadedBuffers.peek();
-
-if (peek.getThrowable().isPresent()) {
-throw peek.getThrowable().get();
-} else if (peek.getIndex() != expectedBufferIndex) {
-return Optional.empty();
+while (peek != null) {
+if (peek.getThrowable().isPresent()) {
+throw peek.getThrowable().get();
+} else if (peek.getIndex() == expectedBufferIndex) {
+break;
+} else if (peek.getIndex() > expectedBufferIndex) {
+return Optional.empty();
+} else if (peek.getIndex() < expectedBufferIndex) {
+// Because the update of consumption progress may be delayed, 
there is a
+// very small probability to load the buffer that has been 
consumed from memory.
+// Skip these buffers directly to avoid repeated consumption.
+loadedBuffers.poll();
+peek = loadedBuffers.peek();
+}
 }
 
-return Optional.of(peek);
+return Optional.ofNullable(peek);
 }
 
 private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 03a560589d6..411a3131975 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -398,22 +398,20 @@ class HsSubpartitionFileReaderImplTest {
 // if no preload data in file reader, return Optional.empty.
 assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent();
 
-// buffers in file: (0-0, 0-1)
-writeDataToFile(0, 0, 0, 2);
+// buffers in file: (0-0, 0-1, 0-2)
+writeDataToFile(0, 0, 0, 3);
 
-Queue memorySegments = createsMemorySegments(2);
+Queue memorySegments = createsMemorySegments(3);
 subpartitionFileReader.prepareForScheduling();
 // trigger reading, add buffer to queue.
 subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {});
 
-// if nextBufferToConsume is not equal to peek elements index, return 
Optional.empty.
-assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent();
-
+// if nextBufferToConsume is equal to peek elements index.
 assertThat(subpartitionFileReader.consumeBuffer(0))
 .hasValueSatisfying(
 (bufferAndBacklog -> {
 assertThat(bufferAndBacklog.getNextDataType())
-.isEqualTo(DataType.EVENT_BUFFER);
+.isEqualTo(DataType.DATA_BUFFER);
 
assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
 // first buffer's data is 0.
 assertThat(
@@ -424,6 +422,26 @@ class HsSubpartitionFileReaderImplTest {