(flink) branch master updated (74864b0b376 -> 502d9673f56)

2024-09-30 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 74864b0b376 [FLINK-36369][table] Replace all class annotations in the 
legacy package with Internal in table modules (table-common, table-api-java and 
table-api-java-bridge)
 add d2ac6bda28c [FLINK-14068][streaming] Removes non-deprecated API that 
uses org.apache.flink.streaming.api.windowing.time.Time;
 add 502d9673f56 [FLINK-14068][core] Fixes docs that was missed in the core 
module PR of FLINK-14068

No new revisions were added by this update.

Summary of changes:
 README.md  |   2 +-
 .../datastream/event-time/generating_watermarks.md |   6 +-
 .../docs/dev/datastream/execution/parallel.md  |  10 +-
 .../content.zh/docs/dev/datastream/experimental.md |   4 +-
 .../docs/dev/datastream/operators/joining.md   |  32 ++---
 .../docs/dev/datastream/operators/overview.md  |  24 ++--
 .../docs/dev/datastream/operators/windows.md   |  76 +--
 docs/content.zh/docs/dev/datastream/overview.md|   8 +-
 docs/content.zh/docs/learn-flink/event_driven.md   |   6 +-
 .../docs/learn-flink/streaming_analytics.md|  16 +--
 docs/content.zh/docs/libs/state_processor_api.md   |   6 +-
 .../docs/ops/state/task_failure_recovery.md|   6 +-
 .../datastream/event-time/generating_watermarks.md |   6 +-
 .../docs/dev/datastream/execution/parallel.md  |  10 +-
 docs/content/docs/dev/datastream/experimental.md   |   4 +-
 .../docs/dev/datastream/operators/joining.md   |  32 ++---
 .../docs/dev/datastream/operators/overview.md  |  24 ++--
 .../docs/dev/datastream/operators/windows.md   |  82 ++--
 docs/content/docs/dev/datastream/overview.md   |   8 +-
 docs/content/docs/learn-flink/event_driven.md  |   6 +-
 .../docs/learn-flink/streaming_analytics.md|  14 +--
 docs/content/docs/libs/state_processor_api.md  |   6 +-
 .../streaming/tests/PeriodicStreamingJob.java  |   4 +-
 .../tests/DataStreamAllroundTestJobFactory.java|   7 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java|   4 +-
 .../java/org/apache/flink/cep/pattern/Pattern.java |   6 +-
 .../apache/flink/cep/nfa/TimesOrMoreITCase.java|   3 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java|   5 +-
 .../state/api/SavepointWindowReaderITCase.java |  34 ++---
 .../state/api/SavepointWriterWindowITCase.java |  18 +--
 .../flink/state/api/input/WindowReaderTest.java|  12 +-
 .../flink/streaming/api/TimeCharacteristic.java|   7 +-
 .../api/datastream/AllWindowedStream.java  |  16 ---
 .../streaming/api/datastream/CoGroupedStreams.java |  50 +---
 .../flink/streaming/api/datastream/DataStream.java |  59 -
 .../streaming/api/datastream/JoinedStreams.java|  50 +---
 .../streaming/api/datastream/KeyedStream.java  |  72 ---
 .../streaming/api/datastream/WindowedStream.java   |  16 ---
 .../assigners/SlidingEventTimeWindows.java |  40 --
 .../assigners/SlidingProcessingTimeWindows.java|  40 --
 .../assigners/TumblingEventTimeWindows.java|  53 
 .../assigners/TumblingProcessingTimeWindows.java   |  56 -
 .../flink/streaming/api/windowing/time/Time.java   | 139 -
 .../operators/windowing/WindowOperatorBuilder.java |   7 --
 .../apache/flink/streaming/api/TypeFillTest.java   |   9 +-
 .../api/operators/StateDescriptorPassingTest.java  |  14 +--
 .../windowing/SlidingEventTimeWindowsTest.java |  35 --
 .../SlidingProcessingTimeWindowsTest.java  |  35 --
 .../windowing/TimeWindowTranslationTest.java   |  34 +++--
 .../windowing/TumblingEventTimeWindowsTest.java|  27 ++--
 .../TumblingProcessingTimeWindowsTest.java |  25 ++--
 .../BoundedOutOfOrdernessTimestampExtractor.java   |  10 --
 .../assigners/EventTimeSessionWindows.java |  14 ---
 .../assigners/ProcessingTimeSessionWindows.java|  14 ---
 .../api/windowing/evictors/TimeEvictor.java|  26 
 .../triggers/ContinuousEventTimeTrigger.java   |  13 --
 .../triggers/ContinuousProcessingTimeTrigger.java  |  13 --
 .../apache/flink/streaming/api/DataStreamTest.java |   3 +-
 ...oundedOutOfOrdernessTimestampExtractorTest.java |  11 +-
 .../windowing/AllWindowTranslationTest.java|  73 +--
 .../windowing/ContinuousEventTimeTriggerTest.java  |  15 +--
 .../ContinuousProcessingTimeTriggerTest.java   |  14 +--
 .../windowing/EventTimeSessionWindowsTest.java |  20 +--
 .../windowing/EvictingWindowOperatorTest.java  |  13 +-
 .../operators/windowing/MergingWindowSetTest.java  |  18 +--
 .../ProcessingTimeSessionWindowsTest.java  |  20 +--
 .../windowing/WindowOperatorMigrationTest.java |  35 +++---
 .../operators/windowing/WindowOperatorTest.java|  76 ++-
 .../operators/windowing

(flink) 02/02: [FLINK-36299][runtime] Makes DeclarativeSlotPool rely on the ComponentMainThreadExecutor that's used in the corresponding test rather than the test's main thread

2024-09-24 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 01d62d3a1958b4bdb4e31ff001d6630e348cd9e4
Author: Matthias Pohl 
AuthorDate: Mon Sep 23 14:37:26 2024 +0200

[FLINK-36299][runtime] Makes DeclarativeSlotPool rely on the 
ComponentMainThreadExecutor that's used in the corresponding test rather than 
the test's main thread

This issue was introduced by FLINK-36168 where I added proper shutdown logic
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 43 +++---
 1 file changed, 22 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 6b3f7acb19e..3a223cb4b8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -158,7 +158,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
@@ -412,7 +411,7 @@ public class AdaptiveSchedulerTest {
 final JobGraph jobGraph = createJobGraph();
 
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 final Configuration configuration = new Configuration();
 configuration.set(
@@ -470,7 +469,7 @@ public class AdaptiveSchedulerTest {
 final JobGraph jobGraph = createJobGraph();
 
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 final Configuration configuration = new Configuration();
 configuration.set(
@@ -695,7 +694,7 @@ public class AdaptiveSchedulerTest {
 final JobGraph jobGraph = createJobGraph();
 
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 final Configuration configuration = 
createConfigurationWithNoTimeouts();
 configuration.set(
@@ -784,7 +783,7 @@ public class AdaptiveSchedulerTest {
 final JobGraph jobGraph = createJobGraph();
 
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 scheduler =
 new AdaptiveSchedulerBuilder(
@@ -805,7 +804,7 @@ public class AdaptiveSchedulerTest {
 final JobGraph jobGraph = createJobGraph();
 
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 final Configuration configuration = new Configuration();
 configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
@@ -834,7 +833,7 @@ public class AdaptiveSchedulerTest {
 final JobGraph jobGraph = createJobGraph();
 
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 final Configuration configuration = new Configuration();
 configuration.set(
@@ -929,7 +928,7 @@ public class AdaptiveSchedulerTest {
 void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
 final JobGraph jobGraph = createJobGraph();
 final DefaultDeclarativeSlotPool declarativeSlotPool =
-createDeclarativeSlotPool(jobGraph.getJobID());
+createDeclarativeSlotPool(jobGraph.getJobID(), 
singleThreadMainThreadExecutor);
 
 final Configuration configuration = new Configuration();
 configuration.set(
@@ -1074,7 +1073,7 @@ public class AdaptiveSchedulerTest {
 final JobGrap

(flink) 01/02: [hotfix][test] Changes the way the thread name of the ComponentMainThreadExecutorServiceAdapter is created

2024-09-24 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b15a704b4f70590a6cefcf3bc6fcd412941717f9
Author: Matthias Pohl 
AuthorDate: Mon Sep 23 14:25:33 2024 +0200

[hotfix][test] Changes the way the thread name of the 
ComponentMainThreadExecutorServiceAdapter is created

The old implementation generated thread names having the prefix multiple 
times.
---
 .../concurrent/ComponentMainThreadExecutorServiceAdapter.java | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index 690ccecf227..277e2c04627 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -78,7 +78,13 @@ public class ComponentMainThreadExecutorServiceAdapter 
implements ComponentMainT
 @Nonnull ScheduledExecutorService singleThreadExecutor) {
 final Thread thread =
 CompletableFuture.supplyAsync(Thread::currentThread, 
singleThreadExecutor).join();
-thread.setName(String.format("ComponentMainThread-%s", 
thread.getName()));
+final String testMainThreadNamePrefix = "ComponentMainThread";
+if (!thread.getName().contains(testMainThreadNamePrefix)) {
+// we have to check the current name first because this instance 
can be reused as a
+// ScheduledExecutorService
+thread.setName(String.format("%s-%s", testMainThreadNamePrefix, 
thread.getName()));
+}
+
 return new 
ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
 }
 



(flink) branch master updated (8bb2f4d7479 -> 01d62d3a195)

2024-09-24 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 8bb2f4d7479 [FLINK-35411][State] Support process write state requests 
within coordinator thread
 new b15a704b4f7 [hotfix][test] Changes the way the thread name of the 
ComponentMainThreadExecutorServiceAdapter is created
 new 01d62d3a195 [FLINK-36299][runtime] Makes DeclarativeSlotPool rely on 
the ComponentMainThreadExecutor that's used in the corresponding test rather 
than the test's main thread

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ComponentMainThreadExecutorServiceAdapter.java |  8 +++-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 43 +++---
 2 files changed, 29 insertions(+), 22 deletions(-)



(flink) branch master updated (300e1ea1f2c -> 32dc6c019c0)

2024-09-23 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 300e1ea1f2c [hotfix] Fix UnifiedSinkMigrationITCase
 new 93aa8b6389b [FLINK-36295] [runtime] Test 
AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale() 
waits until the desired parallelism is reached
 new 8e7f661ece2 [hotfix] UnsupportedOperationException in 
AdaptiveSchedulerClusterITCase fix
 new 32dc6c019c0 [hotfix] Additional logs in AdaptiveScheduler components

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scheduler/adaptive/DefaultStateTransitionManager.java |  2 ++
 .../org/apache/flink/runtime/scheduler/adaptive/State.java|  9 -
 .../scheduler/adaptive/AdaptiveSchedulerClusterITCase.java| 11 +++
 3 files changed, 21 insertions(+), 1 deletion(-)



(flink) 02/03: [hotfix] UnsupportedOperationException in AdaptiveSchedulerClusterITCase fix

2024-09-23 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e7f661ece2354197d4a1e2051df5cb626c19469
Author: Zdenek Tison 
AuthorDate: Thu Sep 19 10:45:58 2024 +0200

[hotfix] UnsupportedOperationException in AdaptiveSchedulerClusterITCase fix
---
 .../runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java   | 5 +
 1 file changed, 5 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
index 45da326b6f9..790601b56d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
@@ -258,6 +258,11 @@ public class AdaptiveSchedulerClusterITCase {
 public Future notifyCheckpointCompleteAsync(long checkpointId) {
 return CompletableFuture.completedFuture(null);
 }
+
+@Override
+public Future notifyCheckpointSubsumedAsync(long checkpointId) {
+return CompletableFuture.completedFuture(null);
+}
 }
 
 private JobGraph createBlockingJobGraph(int parallelism) throws 
IOException {



(flink) 03/03: [hotfix] Additional logs in AdaptiveScheduler components

2024-09-23 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32dc6c019c036b7fea4532bf429d00bfc9b75c78
Author: Zdenek Tison 
AuthorDate: Fri Sep 20 15:33:50 2024 +0200

[hotfix] Additional logs in AdaptiveScheduler components
---
 .../scheduler/adaptive/DefaultStateTransitionManager.java| 2 ++
 .../java/org/apache/flink/runtime/scheduler/adaptive/State.java  | 9 -
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
index da3ccab27c2..4c7be2ed349 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -109,11 +109,13 @@ public class DefaultStateTransitionManager implements 
StateTransitionManager {
 
 @Override
 public void onChange() {
+LOG.debug("OnChange event received in phase {}.", getPhase());
 phase.onChange();
 }
 
 @Override
 public void onTrigger() {
+LOG.debug("OnTrigger event received in phase {}.", getPhase());
 phase.onTrigger();
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
index 2a4bb966042..7d7de39a439 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
@@ -103,7 +103,14 @@ interface State extends LabeledGlobalFailureHandler {
 Class clazz, ThrowingConsumer action, String 
debugMessage) throws E {
 tryRun(
 clazz,
-action,
+x -> {
+getLogger()
+.debug(
+"Running '{}' in state {}.",
+debugMessage,
+this.getClass().getSimpleName());
+ThrowingConsumer.unchecked(action).accept(x);
+},
 logger ->
 logger.debug(
 "Cannot run '{}' because the actual state is 
{} and not {}.",



(flink) 01/03: [FLINK-36295] [runtime] Test AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale() waits until the desired parallelism is reached

2024-09-23 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93aa8b6389b87d68464126f6c53b62fcfcc7f231
Author: Zdenek Tison 
AuthorDate: Thu Sep 19 10:32:19 2024 +0200

[FLINK-36295] [runtime] Test 
AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale() 
waits until the desired parallelism is reached
---
 .../runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java  | 6 ++
 1 file changed, 6 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
index 2a688f12d6d..45da326b6f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
@@ -187,6 +187,12 @@ public class AdaptiveSchedulerClusterITCase {
 
 miniCluster.submitJob(jobGraph).join();
 
+// wait until the desired parallelism is reached
+waitUntilParallelismForVertexReached(
+jobGraph.getJobID(),
+JOB_VERTEX_ID,
+NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS);
+
 // wait until some checkpoints have been completed
 CommonTestUtils.waitUntilCondition(
 () ->



(flink) branch master updated: [FLINK-36015] [runtime] Align rescale parameters

2024-09-18 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ab3c3d9881 [FLINK-36015] [runtime] Align rescale parameters
6ab3c3d9881 is described below

commit 6ab3c3d9881b833278e29d205c0bca1acf575be5
Author: Zdenek Tison 
AuthorDate: Wed Sep 11 16:07:13 2024 +0200

[FLINK-36015] [runtime] Align rescale parameters
---
 .../generated/all_jobmanager_section.html  |  34 +++
 .../generated/expert_scheduling_section.html   |  34 +++
 .../generated/job_manager_configuration.html   |  34 +++
 .../flink/configuration/JobManagerOptions.java | 102 +++
 .../scheduler/adaptive/AdaptiveScheduler.java  | 109 +++--
 .../scheduler/adaptive/WaitingForResources.java|  21 ++--
 .../runtime/minicluster/MiniClusterITCase.java |   6 +-
 .../adaptive/AdaptiveSchedulerClusterITCase.java   |  10 +-
 .../adaptive/AdaptiveSchedulerSimpleITCase.java|   3 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  82 ++--
 .../test/checkpointing/AutoRescalingITCase.java|   4 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java |   4 +-
 .../TaskManagerDisconnectOnShutdownITCase.java |   4 +-
 .../test/scheduling/RescaleOnCheckpointITCase.java |   6 +-
 .../java/org/apache/flink/yarn/YarnTestBase.java   |   3 +-
 15 files changed, 284 insertions(+), 172 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index bcb61cd1849..d1df33a72f3 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -8,6 +8,12 @@
 
 
 
+
+
jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling
+30 s
+Duration
+Determines the minimum time between scaling operations.
+
 
 
jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout
 1 min
@@ -15,35 +21,29 @@
 Defines the duration the JobManager delays the scaling 
operation after a resource change if only sufficient resources are available. 
The scaling operation is performed immediately if the resources have changed 
and the desired resources are available. The timeout begins as soon as either 
the available resources or the job's resource requirements are changed.The resource requirements of a running job can be changed using the 2
+Integer
+The number of consecutive failed checkpoints that will trigger 
rescaling even in the absence of a completed checkpoint.
+
+
+
jobmanager.adaptive-scheduler.rescale-trigger.max-delay
 (none)
 Duration
-The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and the checkpointing interval multiplied by the by-1-incremented 
parameter value of 
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if 
checkpointing is enabled).
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and the checkpointing interval multiplied by the by-1-incremented 
parameter value of 
jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures if 
checkpointing is enabled).
 
 
-
jobmanager.adaptive-scheduler.resource-stabilization-timeout
+
jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout
 10 s
 Duration
-The resource stabilization timeout defines the time the 
JobManager will wait if fewer than the desired but sufficient resources are 
available. The timeout starts once sufficient resources for running the job are 
available. Once this timeout has passed, the job will start executing with the 
available resources.If scheduler-mode is configured to REACTIVE, this configuration value will defaul 
[...]
+The resource stabilization timeout defines the time the 
JobManager will wait if fewer than the desired but sufficient resources are 
available during job submission. The timeout starts once sufficient resources 
for running the job are available. Once this timeout has passed, the job will 
start executing with the available resources.If scheduler-mode is configured to REACTIVE, this configura [...]
 
 
-
jobmanager.adaptive-scheduler.resource-wait-timeout
+
jobmanager.adaptive-scheduler.submission.resource-wait-timeout
 5 min
 Duration
 The maximum time the JobManager will wait to acquir

(flink) branch master updated: [FLINK-36016] [runtime] Synchronize initialization time and clock usage in DefaultStateTransitionManager

2024-09-18 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 78c19a1278e [FLINK-36016] [runtime] Synchronize initialization time 
and clock usage in DefaultStateTransitionManager
78c19a1278e is described below

commit 78c19a1278edf54f679b196ec5ef132dfd4b63e4
Author: Zdenek Tison 
AuthorDate: Tue Sep 10 13:25:14 2024 +0200

[FLINK-36016] [runtime] Synchronize initialization time and clock usage in 
DefaultStateTransitionManager
---
 .../scheduler/adaptive/AdaptiveScheduler.java  | 19 +-
 .../adaptive/DefaultStateTransitionManager.java| 27 +++---
 .../runtime/scheduler/adaptive/Executing.java  | 17 -
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  9 ++---
 .../DefaultStateTransitionManagerTest.java |  5 ++-
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 41 +-
 6 files changed, 46 insertions(+), 72 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 60c605e559f..d993d1d04aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -151,6 +151,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static 
org.apache.flink.configuration.JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
@@ -192,16 +193,16 @@ public class AdaptiveScheduler
  *
  * @see
  * 
DefaultStateTransitionManager#DefaultStateTransitionManager(StateTransitionManager.Context,
- * Duration, Duration, Duration, Temporal)
+ * Supplier, Duration, Duration, Duration)
  */
 @FunctionalInterface
 interface StateTransitionManagerFactory {
 StateTransitionManager create(
 StateTransitionManager.Context context,
+Supplier clock,
 Duration cooldownTimeout,
 Duration resourceStabilizationTimeout,
-Duration maximumDelayForTrigger,
-Temporal lastStateTransition);
+Duration maximumDelayForTrigger);
 }
 
 /**
@@ -411,6 +412,8 @@ public class AdaptiveScheduler
 private final JobFailureMetricReporter jobFailureMetricReporter;
 private final boolean reportEventsAsSpans;
 
+private final Supplier clock = Instant::now;
+
 public AdaptiveScheduler(
 Settings settings,
 JobGraph jobGraph,
@@ -1155,10 +1158,10 @@ public class AdaptiveScheduler
 StateTransitionManager.Context ctx) {
 return stateTransitionManagerFactory.create(
 ctx,
+clock,
 Duration.ZERO, // skip cooldown phase
 settings.getResourceStabilizationTimeout(),
-Duration.ZERO, // trigger immediately once the stabilization 
phase is over
-Instant.now());
+Duration.ZERO); // trigger immediately once the stabilization 
phase is over
 }
 
 private void declareDesiredResources() {
@@ -1194,13 +1197,13 @@ public class AdaptiveScheduler
 }
 
 private StateTransitionManager createExecutingStateTransitionManager(
-StateTransitionManager.Context ctx, Instant lastRescaleTimestamp) {
+StateTransitionManager.Context ctx) {
 return stateTransitionManagerFactory.create(
 ctx,
+clock,
 settings.getScalingIntervalMin(),
 settings.getScalingResourceStabilizationTimeout(),
-settings.getMaximumDelayForTriggeringRescale(),
-lastRescaleTimestamp);
+settings.getMaximumDelayForTriggeringRescale());
 }
 
 @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
index 27bfcfd183e..da3ccab27c2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -28,7 +28,6 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.time.Duration;
-import java.time.Instant;
 import

(flink) branch master updated (0d3be65c06b -> 15361471676)

2024-09-16 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 0d3be65c06b [hotfix][test] Adds back timestamp to log output
 add 15361471676 [FLINK-14068][core] Removes deprecated 
org.apache.flink.api.common.time.Time (#25250)

No new revisions were added by this update.

Summary of changes:
 .../docs/dev/datastream/fault-tolerance/state.md   |  16 +-
 .../docs/ops/state/task_failure_recovery.md|  14 +-
 .../docs/dev/datastream/fault-tolerance/state.md   |  18 +--
 .../docs/ops/state/task_failure_recovery.md|  14 +-
 .../ApplicationDispatcherBootstrap.java|   8 +-
 .../deployment/application/EmbeddedJobClient.java  |   8 +-
 .../application/JobStatusPollingUtils.java |   8 +-
 .../application/executors/EmbeddedExecutor.java|   7 +-
 .../executors/EmbeddedExecutorFactory.java |   6 +-
 .../client/program/rest/RestClusterClient.java |   3 +-
 .../connector/base/sink/AsyncSinkBaseITCase.java   |   5 +-
 .../file/sink/BatchExecutionFileSinkITCase.java|   5 +-
 .../sink/StreamingExecutionFileSinkITCase.java |   4 +-
 .../file/sink/writer/FileWriterBucketTest.java |   6 +-
 .../wikiedits/WikipediaEditsSourceTest.java|   8 +-
 .../common/restartstrategy/RestartStrategies.java  | 134 +---
 .../flink/api/common/state/StateTtlConfig.java |  33 
 .../org/apache/flink/api/common/time/Time.java | 172 -
 .../flink/configuration/ConfigurationUtils.java|  14 +-
 .../main/java/org/apache/flink/util/TimeUtils.java |  38 +
 .../flink/api/common/ExecutionConfigTest.java  |   4 +-
 .../eventtime/WatermarksWithIdlenessTest.java  |   2 +-
 .../api/common/state/StateDescriptorTest.java  |   4 +-
 .../flink/api/common/state/StateTtlConfigTest.java |  10 +-
 .../org/apache/flink/testutils/TestingUtils.java   |   7 +-
 .../flink/util/TimeUtilsPrettyPrintingTest.java|   3 +-
 .../java/org/apache/flink/util/TimeUtilsTest.java  |  11 --
 .../flink/connector/file/sink/FileSinkProgram.java |   6 +-
 .../flink/sql/tests/StreamSQLTestProgram.java  |   6 +-
 .../flink/streaming/tests/TtlTestConfig.java   |  11 +-
 .../tests/verify/AbstractTtlStateVerifier.java |   2 +-
 ...HighAvailabilityRecoverFromSavepointITCase.java |   2 +-
 flink-python/pyflink/common/restart_strategy.py|  16 +-
 .../datastream/connectors/tests/test_kafka.py  |   2 +
 .../pyflink/fn_execution/embedded/java_utils.py|   4 +-
 flink-python/pyflink/table/table_config.py |   6 +-
 flink-python/pyflink/util/java_utils.py|  16 +-
 .../org/apache/flink/python/util/ProtoUtils.java   |   4 +-
 .../flink/streaming/api/utils/ProtoUtilsTest.java  |   6 +-
 .../flink/runtime/rpc/pekko/PekkoRpcActorTest.java |   9 +-
 .../runtime/rpc/pekko/TimeoutCallStackTest.java|  11 +-
 .../apache/flink/runtime/rpc/RpcGatewayUtils.java  |   8 +-
 .../org/apache/flink/runtime/rpc/RpcUtils.java |   3 +-
 .../runtime/webmonitor/WebSubmissionExtension.java |   6 +-
 .../webmonitor/handlers/JarDeleteHandler.java  |   4 +-
 .../webmonitor/handlers/JarListHandler.java|   4 +-
 .../webmonitor/handlers/JarPlanHandler.java|   6 +-
 .../runtime/webmonitor/handlers/JarRunHandler.java |   4 +-
 .../webmonitor/handlers/JarUploadHandler.java  |   4 +-
 .../webmonitor/LeaderRetrievalHandlerTest.java |  17 +-
 .../runtime/webmonitor/WebMonitorUtilsTest.java|   4 +-
 .../webmonitor/WebSubmissionExtensionTest.java |   5 +-
 .../webmonitor/handlers/JarDeleteHandlerTest.java  |   4 +-
 .../handlers/JarHandlerParameterTest.java  |   6 +-
 .../runtime/webmonitor/handlers/JarHandlers.java   |   4 +-
 .../handlers/JarRunHandlerParameterTest.java   |   3 +-
 .../webmonitor/handlers/JarUploadHandlerTest.java  |   4 +-
 .../flink/runtime/dispatcher/Dispatcher.java   |  54 +++
 .../DispatcherCachedOperationsHandler.java |  10 +-
 .../runtime/dispatcher/DispatcherGateway.java  |  16 +-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   6 +-
 .../dispatcher/FileExecutionGraphInfoStore.java|  12 +-
 .../dispatcher/MemoryExecutionGraphInfoStore.java  |  12 +-
 .../flink/runtime/dispatcher/MiniDispatcher.java   |   8 +-
 .../dispatcher/TriggerCheckpointFunction.java  |   4 +-
 .../dispatcher/TriggerSavepointFunction.java   |   4 +-
 .../cleanup/CheckpointResourcesCleanupRunner.java  |  10 +-
 .../runtime/entrypoint/ClusterEntrypoint.java  |   8 +-
 .../entrypoint/SessionClusterEntrypoint.java   |   6 +-
 .../executiongraph/DefaultExecutionGraph.java  |   6 +-
 .../DefaultExecutionGraphBuilder.java  |   4 +-
 .../flink/runtime/executiongraph/Execution.java|   6 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |   6 +-
 .../runtime/executiongraph/ExecutionVertex.java|   6

(flink) branch master updated (ed7f57c9ccb -> 0d3be65c06b)

2024-09-16 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from ed7f57c9ccb [FLINK-36258][config] set `HADOOP_CONF_DIR` to config 
hadoop in YarnFileStageTestS3ITCase
 new 0731f4f1e12 [FLINK-36279][runtime] Making desired resources being 
calculated based on all available slots for the job
 new 621ce3248a9 [hotfix][test] Adds some additional logging to 
RescaleOnCheckpointITCase
 new 0d3be65c06b [hotfix][test] Adds back timestamp to log output

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scheduler/adaptive/AdaptiveScheduler.java  |  4 +--
 .../test/scheduling/RescaleOnCheckpointITCase.java | 38 +++---
 .../src/test/resources/log4j2-test.properties  |  2 +-
 3 files changed, 36 insertions(+), 8 deletions(-)



(flink) 02/03: [hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase

2024-09-16 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 621ce3248a927eab2886dd0c3e557fffc878be05
Author: Matthias Pohl 
AuthorDate: Mon Sep 16 12:10:09 2024 +0200

[hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase
---
 .../test/scheduling/RescaleOnCheckpointITCase.java | 38 +++---
 1 file changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
index 60c2b19cca1..4febfe453cc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -40,6 +41,8 @@ import org.apache.flink.util.TestLoggerExtension;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Iterator;
@@ -51,6 +54,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(TestLoggerExtension.class)
 class RescaleOnCheckpointITCase {
 
+private static final Logger LOG = 
LoggerFactory.getLogger(RescaleOnCheckpointITCase.class);
+
 // Scaling down is used here because scaling up is not supported by the 
NumberSequenceSource
 // that's used in this test.
 private static final int NUMBER_OF_SLOTS = 4;
@@ -111,34 +116,59 @@ class RescaleOnCheckpointITCase {
 assertThat(jobVertexIterator.hasNext())
 .as("There needs to be at least one JobVertex.")
 .isTrue();
+final JobVertexID jobVertexId = jobVertexIterator.next().getID();
 final JobResourceRequirements jobResourceRequirements =
 JobResourceRequirements.newBuilder()
-.setParallelismForJobVertex(
-jobVertexIterator.next().getID(), 1, 
AFTER_RESCALE_PARALLELISM)
+.setParallelismForJobVertex(jobVertexId, 1, 
AFTER_RESCALE_PARALLELISM)
 .build();
 assertThat(jobVertexIterator.hasNext())
 .as("This test expects to have only one JobVertex.")
 .isFalse();
 
 restClusterClient.submitJob(jobGraph).join();
+
+final JobID jobId = jobGraph.getJobID();
 try {
-final JobID jobId = jobGraph.getJobID();
 
+LOG.info(
+"Waiting for job {} to reach parallelism of {} for vertex 
{}.",
+jobId,
+BEFORE_RESCALE_PARALLELISM,
+jobVertexId);
 waitForRunningTasks(restClusterClient, jobId, 
BEFORE_RESCALE_PARALLELISM);
 
+LOG.info(
+"Job {} reached parallelism of {} for vertex {}. Updating 
the vertex parallelism next to {}.",
+jobId,
+BEFORE_RESCALE_PARALLELISM,
+jobVertexId,
+AFTER_RESCALE_PARALLELISM);
 restClusterClient.updateJobResourceRequirements(jobId, 
jobResourceRequirements).join();
 
 // timeout to allow any unexpected rescaling to happen anyway
 Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());
 
 // verify that the previous timeout didn't result in a change of 
parallelism
+LOG.info(
+"Checking that job {} hasn't changed its parallelism even 
after some delay, yet.",
+jobId);
 waitForRunningTasks(restClusterClient, jobId, 
BEFORE_RESCALE_PARALLELISM);
 
 miniCluster.triggerCheckpoint(jobId);
 
+LOG.info(
+"Waiting for job {} to reach parallelism of {} for vertex 
{}.",
+jobId,
+AFTER_RESCALE_PARALLELISM,
+jobVertexId);
 waitForRunningTasks(restClusterClient, jobId, 
AFTER_RESCALE_PARALLELISM);
 
-waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - 
AFTER_RESCALE_PARALLELISM);
+final int expectedFreeSlot

(flink) 01/03: [FLINK-36279][runtime] Making desired resources being calculated based on all available slots for the job

2024-09-16 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0731f4f1e122fb4e675464380300e7ffdd595d46
Author: Matthias Pohl 
AuthorDate: Mon Sep 16 12:09:35 2024 +0200

[FLINK-36279][runtime] Making desired resources being calculated based on 
all available slots for the job

FLINK-36014 aligned the triggering of the CreateExecutionGraph state within 
WaitingForResources and Executing state. Before that change, only 
WaitingForResources relied on this method. Relying on free slots was good 
enough because in WaitingForResources state, there are no slots allocated, yet.

Using this method for Executing state now as well changes this premise 
because there are slots allocated while checking the slot availability that 
would become available after the restart. Hence, considering these slots in the 
slot availability check is good enough. This will not break the premise for the 
WaitingForResources state.
---
 .../apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java| 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 5686c0e47ed..60c605e559f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1078,9 +1078,7 @@ public class AdaptiveScheduler
 
 @Override
 public boolean hasDesiredResources() {
-final Collection freeSlots =
-
declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation();
-return hasDesiredResources(desiredResources, freeSlots);
+return hasDesiredResources(desiredResources, 
declarativeSlotPool.getAllSlotsInformation());
 }
 
 @VisibleForTesting



(flink) 03/03: [hotfix][test] Adds back timestamp to log output

2024-09-16 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d3be65c06b7e8afa90e00628bc1c325953c597f
Author: Matthias Pohl 
AuthorDate: Mon Sep 16 12:45:32 2024 +0200

[hotfix][test] Adds back timestamp to log output

This was accidentally removed with the changes of FLINK-34417
---
 flink-tests/src/test/resources/log4j2-test.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-tests/src/test/resources/log4j2-test.properties 
b/flink-tests/src/test/resources/log4j2-test.properties
index 843e105b0ea..4dcd8178762 100644
--- a/flink-tests/src/test/resources/log4j2-test.properties
+++ b/flink-tests/src/test/resources/log4j2-test.properties
@@ -28,7 +28,7 @@ appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR
 appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = [%-32X{flink-job-id}] %c{0} [%t] %-5p %m%n
+appender.testlogger.layout.pattern =  %-4r [%-32X{flink-job-id}] %c{0} [%t] 
%-5p %m%n
 
 logger.migration.name = org.apache.flink.test.migration
 logger.migration.level = INFO



(flink) branch master updated (8dc40ab70c9 -> 6a5fa9962a9)

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 8dc40ab70c9 [FLINK-32688][runtime] Removes deprecated 
JobExceptionsInfo. (#25248)
 new b79cf32cebe [hotfix] Fixes JobExceptionsInfoWithHistory equals method
 new 6a5fa9962a9 [FLINK-36147][runtime] Removes deprecated location field 
(was deprecated in 1.19)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/test/resources/rest_api_v1.snapshot|  6 
 .../rest/handler/job/JobExceptionsHandler.java |  3 --
 .../messages/JobExceptionsInfoWithHistory.java | 41 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 14 
 .../JobExceptionsInfoWithHistoryNoRootTest.java|  2 --
 5 files changed, 10 insertions(+), 56 deletions(-)



(flink) 01/02: [hotfix] Fixes JobExceptionsInfoWithHistory equals method

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b79cf32cebeb5044dcc339cc52a47b40a5498fd6
Author: Matthias Pohl 
AuthorDate: Fri Aug 23 22:27:45 2024 +0200

[hotfix] Fixes JobExceptionsInfoWithHistory equals method
---
 .../flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
index fbf17723f6e..1ca124cf01d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -287,7 +287,7 @@ public class JobExceptionsInfoWithHistory implements 
ResponseBody {
 && Objects.equals(failureLabels, that.failureLabels)
 && Objects.equals(taskName, that.taskName)
 && Objects.equals(location, that.location)
-&& Objects.equals(location, that.endpoint);
+&& Objects.equals(endpoint, that.endpoint);
 }
 
 @Override



(flink) 02/02: [FLINK-36147][runtime] Removes deprecated location field (was deprecated in 1.19)

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a5fa9962a90b663209a9eb2e910290b55eb97e1
Author: Matthias Pohl 
AuthorDate: Fri Aug 23 22:32:26 2024 +0200

[FLINK-36147][runtime] Removes deprecated location field (was deprecated in 
1.19)
---
 .../src/test/resources/rest_api_v1.snapshot|  6 
 .../rest/handler/job/JobExceptionsHandler.java |  3 --
 .../messages/JobExceptionsInfoWithHistory.java | 39 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 14 
 .../JobExceptionsInfoWithHistoryNoRootTest.java|  2 --
 5 files changed, 9 insertions(+), 55 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 995662900eb..9e9fe94da6a 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1989,9 +1989,6 @@
   "taskName" : {
 "type" : "string"
   },
-  "location" : {
-"type" : "string"
-  },
   "endpoint" : {
 "type" : "string"
   },
@@ -2022,9 +2019,6 @@
 "taskName" : {
   "type" : "string"
 },
-"location" : {
-  "type" : "string"
-},
 "endpoint" : {
   "type" : "string"
 },
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index c8580fa753b..ac00137c630 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -195,7 +195,6 @@ public class JobExceptionsHandler
 historyEntry.getFailureLabels(),
 historyEntry.getFailingTaskName(),
 toString(historyEntry.getTaskManagerLocation()),
-toString(historyEntry.getTaskManagerLocation()),
 toTaskManagerId(historyEntry.getTaskManagerLocation()),
 concurrentExceptions);
 }
@@ -211,7 +210,6 @@ public class JobExceptionsHandler
 exceptionHistoryEntry.getFailureLabels(),
 null,
 null,
-null,
 null);
 }
 
@@ -224,7 +222,6 @@ public class JobExceptionsHandler
 exceptionHistoryEntry.getFailureLabels(),
 exceptionHistoryEntry.getFailingTaskName(),
 toString(exceptionHistoryEntry.getTaskManagerLocation()),
-toString(exceptionHistoryEntry.getTaskManagerLocation()),
 
toTaskManagerId(exceptionHistoryEntry.getTaskManagerLocation()));
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
index 1ca124cf01d..f1253592e1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java
@@ -154,7 +154,6 @@ public class JobExceptionsInfoWithHistory implements 
ResponseBody {
 public static final String FIELD_NAME_EXCEPTION_STACKTRACE = 
"stacktrace";
 public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = 
"timestamp";
 public static final String FIELD_NAME_TASK_NAME = "taskName";
-@Deprecated public static final String FIELD_NAME_LOCATION = 
"location";
 public static final String FIELD_NAME_ENDPOINT = "endpoint";
 public static final String FIELD_NAME_TASK_MANAGER_ID = 
"taskManagerId";
 public static final String FIELD_NAME_FAILURE_LABELS = "failureLabels";
@@ -173,13 +172,6 @@ public class JobExceptionsInfoWithHistory implements 
ResponseBody {
 @Nullable
 private final String taskName;
 
-/** @deprecated Use {@link ExceptionInfo#endpoint} instead. */
-@Deprecated
-@JsonInclude(NON_NULL)
-@JsonProperty(FIELD_NAME_LOCATION)
-@Nullable
-private final String location;
-

(flink) branch master updated: [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248)

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8dc40ab70c9 [FLINK-32688][runtime] Removes deprecated 
JobExceptionsInfo. (#25248)
8dc40ab70c9 is described below

commit 8dc40ab70c991d67949d3498b26dd6661e816312
Author: Matthias Pohl 
AuthorDate: Thu Sep 12 19:24:02 2024 +0200

[FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248)
---
 .../src/test/resources/rest_api_v1.snapshot|  36 
 .../rest/handler/job/JobExceptionsHandler.java |  47 -
 .../runtime/rest/messages/JobExceptionsInfo.java   | 230 -
 .../messages/JobExceptionsInfoWithHistory.java |  32 +--
 .../rest/handler/job/JobExceptionsHandlerTest.java |  72 ---
 .../JobExceptionsInfoWithHistoryNoRootTest.java|  22 --
 .../messages/JobExceptionsInfoWithHistoryTest.java |  22 --
 7 files changed, 40 insertions(+), 421 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 6fcee2ff9c1..995662900eb 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1961,42 +1961,6 @@
   "type" : "object",
   "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory",
   "properties" : {
-"root-exception" : {
-  "type" : "string"
-},
-"timestamp" : {
-  "type" : "integer"
-},
-"all-exceptions" : {
-  "type" : "array",
-  "items" : {
-"type" : "object",
-"id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo:ExecutionExceptionInfo",
-"properties" : {
-  "exception" : {
-"type" : "string"
-  },
-  "task" : {
-"type" : "string"
-  },
-  "location" : {
-"type" : "string"
-  },
-  "endpoint" : {
-"type" : "string"
-  },
-  "timestamp" : {
-"type" : "integer"
-  },
-  "taskManagerId" : {
-"type" : "string"
-  }
-}
-  }
-},
-"truncated" : {
-  "type" : "boolean"
-},
 "exceptionHistory" : {
   "type" : "object",
   "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 6d5f49d55b4..c8580fa753b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -20,15 +20,9 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -56,7 +50,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -130,47 +123,7 @@ public class JobExceptionsHandler
 Executio

(flink) branch master updated: [FLINK-36207][build] Disables deprecated API from japicmp

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 625c6774fc1 [FLINK-36207][build] Disables deprecated API from japicmp
625c6774fc1 is described below

commit 625c6774fc187aed9f875abb421dd82d6cd10548
Author: Matthias Pohl 
AuthorDate: Tue Sep 3 21:33:02 2024 +0200

[FLINK-36207][build] Disables deprecated API from japicmp

The japicmp plugin doesn't seem to work well with Scala annotations (more 
specifically @scala.deprecated). But we're planning to remove the Scala code as 
part of the 2.0 release, anyway. Hence, excluding all *scala files should be 
good enough.
---
 pom.xml | 4 
 1 file changed, 4 insertions(+)

diff --git a/pom.xml b/pom.xml
index 777fea41f31..fb56ce20f60 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2332,6 +2332,8 @@ under the License.




+   
@java.lang.Deprecated
+   
*.scala

@org.apache.flink.annotation.Experimental

@org.apache.flink.annotation.PublicEvolving

@org.apache.flink.annotation.Internal
@@ -2341,6 +2343,8 @@ under the License.

org.apache.flink.configuration.Configuration#setBytes(java.lang.String,byte[])


org.apache.flink.configuration.ConfigConstants
+   
+   
org.apache.flink.api.common.eventtime.WatermarksWithIdleness


org.apache.flink.api.java.tuple.*

org.apache.flink.types.NullFieldException



(flink) branch master updated: [FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305)

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 83be264569d [FLINK-36168][runtime] Replaces goToFinished with the 
cancel method (#25305)
83be264569d is described below

commit 83be264569d3d8c66dc7b82c062e65f34e35d119
Author: Matthias Pohl 
AuthorDate: Thu Sep 12 17:44:32 2024 +0200

[FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305)

We have to transition to all the expected state transitions properly to 
handle all the cleanup. This also requires proper handling of the state 
transitions by the DummyState test implementations.
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 89 --
 .../adaptive/WaitingForResourcesTest.java  | 15 ++--
 2 files changed, 57 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 49a58b286ac..c09fbfadcb1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -143,6 +143,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -206,24 +207,40 @@ public class AdaptiveSchedulerTest {
 }
 
 private static void closeInExecutorService(
-@Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor 
executor) {
+@Nullable AdaptiveScheduler scheduler, Executor executor) {
 if (scheduler != null) {
 final CompletableFuture closeFuture = new 
CompletableFuture<>();
 executor.execute(
 () -> {
 try {
-// no matter what state the scheduler is in; we 
have to go to Finished
-// state to please the Preconditions of the close 
call
-if (scheduler.getState().getClass() != 
Finished.class) {
-scheduler.goToFinished(
-scheduler.getArchivedExecutionGraph(
-JobStatus.CANCELED, null));
-}
+scheduler.cancel();
+
 FutureUtils.forward(scheduler.closeAsync(), 
closeFuture);
 } catch (Throwable t) {
 closeFuture.completeExceptionally(t);
 }
 });
+
+// we have to wait for the job termination outside the main thread 
because the
+// cancellation tasks are scheduled on the main thread as well.
+scheduler
+.getJobTerminationFuture()
+.whenCompleteAsync(
+(jobStatus, error) -> {
+assertThat(scheduler.getState().getClass())
+.isEqualTo(Finished.class);
+
+if (error != null) {
+closeFuture.completeExceptionally(error);
+} else {
+try {
+
FutureUtils.forward(scheduler.closeAsync(), closeFuture);
+} catch (Throwable t) {
+closeFuture.completeExceptionally(t);
+}
+}
+},
+executor);
 assertThatFuture(closeFuture).eventuallySucceeds();
 }
 }
@@ -310,7 +327,7 @@ public class AdaptiveSchedulerTest {
 final State state = scheduler.getState();
 
 assertThat(scheduler.isState(state)).isTrue();
-assertThat(scheduler.isState(new DummyState())).isFalse();
+assertThat(scheduler.isState(new DummyState(scheduler))).isFalse();
 }
 
 @Test
@@ -337,7 +354,7 @@ public class AdaptiveSchedulerTest {
 .build();
 
 AtomicBoolean ran = new AtomicBoolean(false);
-scheduler.runIfState(new DummyState(), () -> ran.set(true));
+scheduler.runIfState(new DummyState(scheduler), () -> ran.set(true));
 assertThat(ran.get()).isFalse();

(flink) branch master updated (740a5674fd9 -> 77da0414929)

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 740a5674fd9 [FLINK-34168][config] Refactor callers that use deprecated 
get/setXXX (#25301)
 new c43e7915ffe [FLINK-36013] [runtime] Introduce the transition from 
Restarting to CreatingExecutionGraph state
 new 77da0414929 [hotfix] Added ExecutionGraph validation while transiting 
to WaitingForResources state in RestartingTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scheduler/adaptive/AdaptiveScheduler.java  |  2 +
 .../runtime/scheduler/adaptive/Executing.java  |  1 +
 .../scheduler/adaptive/FailureResultUtil.java  |  1 +
 .../runtime/scheduler/adaptive/Restarting.java | 32 +++---
 .../scheduler/adaptive/StateTransitions.java   |  4 ++
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 36 +--
 .../runtime/scheduler/adaptive/RestartingTest.java | 70 +-
 .../scheduler/adaptive/StopWithSavepointTest.java  | 10 +++-
 8 files changed, 124 insertions(+), 32 deletions(-)



(flink) 02/02: [hotfix] Added ExecutionGraph validation while transiting to WaitingForResources state in RestartingTest

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 77da0414929437687d7c19df18200a2528b6c27c
Author: Zdenek Tison 
AuthorDate: Fri Sep 6 08:26:38 2024 +0200

[hotfix] Added ExecutionGraph validation while transiting to 
WaitingForResources state in RestartingTest
---
 .../apache/flink/runtime/scheduler/adaptive/RestartingTest.java   | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index 77d9c8a2373..346f09b00eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -191,7 +191,7 @@ class RestartingTest {
 private final StateValidator 
cancellingStateValidator =
 new StateValidator<>("Cancelling");
 
-private final StateValidator waitingForResourcesStateValidator =
+private final StateValidator 
waitingForResourcesStateValidator =
 new StateValidator<>("WaitingForResources");
 
 private final StateValidator 
creatingExecutionGraphStateValidator =
@@ -202,7 +202,7 @@ class RestartingTest {
 }
 
 public void setExpectWaitingForResources() {
-waitingForResourcesStateValidator.expectInput((none) -> {});
+waitingForResourcesStateValidator.expectInput(assertNonNull());
 }
 
 public void setExpectCreatingExecutionGraph() {
@@ -225,8 +225,8 @@ class RestartingTest {
 public void archiveFailure(RootExceptionHistoryEntry failure) {}
 
 @Override
-public void goToWaitingForResources(ExecutionGraph 
previousExecutionGraph) {
-waitingForResourcesStateValidator.validateInput(null);
+public void goToWaitingForResources(@Nullable ExecutionGraph 
previousExecutionGraph) {
+
waitingForResourcesStateValidator.validateInput(previousExecutionGraph);
 hadStateTransition = true;
 }
 



(flink) 01/02: [FLINK-36013] [runtime] Introduce the transition from Restarting to CreatingExecutionGraph state

2024-09-12 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c43e7915ffe2932a266e6aa2294724bb39a603d1
Author: Zdenek Tison 
AuthorDate: Mon Aug 12 13:43:53 2024 +0200

[FLINK-36013] [runtime] Introduce the transition from Restarting to 
CreatingExecutionGraph state
---
 .../scheduler/adaptive/AdaptiveScheduler.java  |  2 +
 .../runtime/scheduler/adaptive/Executing.java  |  1 +
 .../scheduler/adaptive/FailureResultUtil.java  |  1 +
 .../runtime/scheduler/adaptive/Restarting.java | 32 +++
 .../scheduler/adaptive/StateTransitions.java   |  4 ++
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 36 +++--
 .../runtime/scheduler/adaptive/RestartingTest.java | 62 +-
 .../scheduler/adaptive/StopWithSavepointTest.java  | 10 +++-
 8 files changed, 120 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 7ef897936d2..5686c0e47ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1229,6 +1229,7 @@ public class AdaptiveScheduler
 ExecutionGraphHandler executionGraphHandler,
 OperatorCoordinatorHandler operatorCoordinatorHandler,
 Duration backoffTime,
+boolean forcedRestart,
 List failureCollection) {
 
 for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
@@ -1249,6 +1250,7 @@ public class AdaptiveScheduler
 operatorCoordinatorHandler,
 LOG,
 backoffTime,
+forcedRestart,
 userCodeClassLoader,
 failureCollection));
 numRestarts++;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index bebc8a4..6139767a5c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -156,6 +156,7 @@ class Executing extends StateWithExecutionGraph
 getExecutionGraphHandler(),
 getOperatorCoordinatorHandler(),
 Duration.ofMillis(0L),
+true,
 getFailures());
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
index eb2c64eae99..bc16cafbf14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java
@@ -30,6 +30,7 @@ public class FailureResultUtil {
 sweg.getExecutionGraphHandler(),
 sweg.getOperatorCoordinatorHandler(),
 failureResult.getBackoffTime(),
+false,
 sweg.getFailures());
 } else {
 sweg.getLogger().info("Failing job.", 
failureResult.getFailureCause());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index f647967edb4..1dd3f29778f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -42,7 +42,9 @@ class Restarting extends StateWithExecutionGraph {
 
 private final Duration backoffTime;
 
-@Nullable private ScheduledFuture goToWaitingForResourcesFuture;
+@Nullable private ScheduledFuture goToSubsequentStateFuture;
+
+private final boolean forcedRestart;
 
 Restarting(
 Context context,
@@ -51,6 +53,7 @@ class Restarting extends StateWithExecutionGraph {
 OperatorCoordinatorHandler operatorCoordinatorHandler,
 Logger logger,
 Duration backoffTime,
+boolean forcedRestart,
 ClassLoader userCodeClassLoader,
 List failureCollection) {
 super(
@@ -63,14 +66,15 @@ class Restarting extends StateWithExecutionGraph {
 failureCollection);
 this.context = context;
 this.backoffTime = backoffTime;
+this.forcedRestart = for

(flink) branch master updated (1d5b214eb68 -> db682b9869d)

2024-09-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 1d5b214eb68 [FLINK-36246] Move async state related operators 
flink-runtime (#25306)
 add db682b9869d [FLINK-36014] [runtime] Align the desired and sufficient 
resources definiton in Executing and WaitForResources states

No new revisions were added by this update.

Summary of changes:
 .../generated/all_jobmanager_section.html  | 18 ++---
 .../generated/expert_scheduling_section.html   | 18 ++---
 .../generated/job_manager_configuration.html   | 18 ++---
 .../flink/configuration/JobManagerOptions.java | 61 +---
 .../scheduler/adaptive/AdaptiveScheduler.java  | 51 +-
 .../adaptive/DefaultStateTransitionManager.java| 24 +++
 .../runtime/scheduler/adaptive/Executing.java  | 50 +++--
 .../EnforceMinimalIncreaseRescalingController.java | 48 -
 ...nforceParallelismChangeRescalingController.java | 41 ---
 .../scalingpolicy/RescalingController.java | 38 --
 .../adaptive/AdaptiveSchedulerClusterITCase.java   |  3 +
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 29 
 .../DefaultStateTransitionManagerTest.java | 60 
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 82 --
 .../adaptive/StateTrackingMockExecutionGraph.java  |  5 +-
 ...orceMinimalIncreaseRescalingControllerTest.java | 69 --
 ...ceParallelismChangeRescalingControllerTest.java | 61 
 17 files changed, 256 insertions(+), 420 deletions(-)
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingController.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceParallelismChangeRescalingController.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/RescalingController.java
 delete mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingControllerTest.java
 delete mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceParallelismChangeRescalingControllerTest.java



(flink) branch master updated: [FLINK-36012] [runtime] Integrate StateTransitionManager into WaitingForResources state

2024-09-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1f303a218f4 [FLINK-36012] [runtime] Integrate StateTransitionManager 
into WaitingForResources state
1f303a218f4 is described below

commit 1f303a218f47aa1a8eaf1323fc2a5debe914762d
Author: Zdenek Tison 
AuthorDate: Fri Jul 19 11:00:10 2024 +0200

[FLINK-36012] [runtime] Integrate StateTransitionManager into 
WaitingForResources state
---
 .../scheduler/adaptive/AdaptiveScheduler.java  |  50 +++-
 .../adaptive/DefaultStateTransitionManager.java|  88 +++
 .../runtime/scheduler/adaptive/Executing.java  |  12 +-
 .../scheduler/adaptive/StateTransitionManager.java |  11 -
 .../scheduler/adaptive/WaitingForResources.java| 109 -
 .../adaptive/AdaptiveSchedulerBuilder.java |   8 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 193 +---
 .../DefaultStateTransitionManagerTest.java |  34 +--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  51 +++--
 .../adaptive/TestingStateTransitionManager.java|  39 ++--
 .../adaptive/WaitingForResourcesTest.java  | 255 -
 11 files changed, 445 insertions(+), 405 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index ebcfc369c82..b5b98984a11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -136,6 +136,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.Temporal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -185,6 +187,24 @@ public class AdaptiveScheduler
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveScheduler.class);
 
+/**
+ * Named callback interface for creating {@code StateTransitionManager} 
instances. This internal
+ * interface allows for easier testing of the parameter injection in a 
unit test.
+ *
+ * @see
+ * 
DefaultStateTransitionManager#DefaultStateTransitionManager(StateTransitionManager.Context,
+ * Duration, Duration, Duration, Temporal)
+ */
+@FunctionalInterface
+interface StateTransitionManagerFactory {
+StateTransitionManager create(
+StateTransitionManager.Context context,
+Duration cooldownTimeout,
+@Nullable Duration resourceStabilizationTimeout,
+Duration maximumDelayForTrigger,
+Temporal lastStateTransition);
+}
+
 /**
  * Consolidated settings for the adaptive scheduler. This class is used to 
avoid passing around
  * multiple config options.
@@ -349,7 +369,7 @@ public class AdaptiveScheduler
 }
 
 private final Settings settings;
-private final StateTransitionManager.Factory stateTransitionManagerFactory;
+private final StateTransitionManagerFactory stateTransitionManagerFactory;
 
 private final JobGraph jobGraph;
 
@@ -427,7 +447,7 @@ public class AdaptiveScheduler
 throws JobExecutionException {
 this(
 settings,
-DefaultStateTransitionManager.Factory.fromSettings(settings),
+DefaultStateTransitionManager::new,
 (metricGroup, checkpointStatsListener) ->
 new DefaultCheckpointStatsTracker(
 
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
@@ -455,7 +475,7 @@ public class AdaptiveScheduler
 @VisibleForTesting
 AdaptiveScheduler(
 Settings settings,
-StateTransitionManager.Factory stateTransitionManagerFactory,
+StateTransitionManagerFactory stateTransitionManagerFactory,
 BiFunction
 checkpointStatsTrackerFactory,
 JobGraph jobGraph,
@@ -1143,10 +1163,20 @@ public class AdaptiveScheduler
 this,
 LOG,
 settings.getInitialResourceAllocationTimeout(),
-settings.getResourceStabilizationTimeout(),
+this::createWaitingForResourceStateTransitionManager,
 previousExecutionGraph));
 }
 
+private StateTransitionManager 
createWaitingForResourceStateTransitionManager(
+StateTransitionManager.Context ctx) {
+return stateTransitionManagerFactory.create(
+ctx,
+Duration.Z

(flink) branch master updated: [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager

2024-09-02 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c869326d089 [FLINK-36011] [runtime] Generalize RescaleManager to 
become StateTransitionManager
c869326d089 is described below

commit c869326d089705475481c2c2ea42a6efabb8c828
Author: Zdenek Tison 
AuthorDate: Fri Jul 12 15:01:55 2024 +0200

[FLINK-36011] [runtime] Generalize RescaleManager to become 
StateTransitionManager
---
 .../scheduler/adaptive/AdaptiveScheduler.java  |  10 +-
 .../scheduler/adaptive/DefaultRescaleManager.java  | 232 ---
 .../adaptive/DefaultStateTransitionManager.java| 434 +
 .../runtime/scheduler/adaptive/Executing.java  |  38 +-
 ...aleManager.java => StateTransitionManager.java} |  39 +-
 .../adaptive/AdaptiveSchedulerBuilder.java |  14 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   4 +-
 .../adaptive/DefaultRescaleManagerTest.java| 675 ---
 .../DefaultStateTransitionManagerTest.java | 722 +
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  41 +-
 ...ger.java => TestingStateTransitionManager.java} |  16 +-
 11 files changed, 1250 insertions(+), 975 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 2fd7b694b73..ebcfc369c82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -349,7 +349,7 @@ public class AdaptiveScheduler
 }
 
 private final Settings settings;
-private final RescaleManager.Factory rescaleManagerFactory;
+private final StateTransitionManager.Factory stateTransitionManagerFactory;
 
 private final JobGraph jobGraph;
 
@@ -427,7 +427,7 @@ public class AdaptiveScheduler
 throws JobExecutionException {
 this(
 settings,
-DefaultRescaleManager.Factory.fromSettings(settings),
+DefaultStateTransitionManager.Factory.fromSettings(settings),
 (metricGroup, checkpointStatsListener) ->
 new DefaultCheckpointStatsTracker(
 
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
@@ -455,7 +455,7 @@ public class AdaptiveScheduler
 @VisibleForTesting
 AdaptiveScheduler(
 Settings settings,
-RescaleManager.Factory rescaleManagerFactory,
+StateTransitionManager.Factory stateTransitionManagerFactory,
 BiFunction
 checkpointStatsTrackerFactory,
 JobGraph jobGraph,
@@ -480,7 +480,7 @@ public class AdaptiveScheduler
 assertPreconditions(jobGraph);
 
 this.settings = settings;
-this.rescaleManagerFactory = rescaleManagerFactory;
+this.stateTransitionManagerFactory = stateTransitionManagerFactory;
 
 this.jobGraph = jobGraph;
 this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), 
jobGraph.getName());
@@ -1175,7 +1175,7 @@ public class AdaptiveScheduler
 this,
 userCodeClassLoader,
 failureCollection,
-rescaleManagerFactory,
+stateTransitionManagerFactory,
 settings.getMinParallelismChangeForDesiredRescale(),
 settings.getRescaleOnFailedCheckpointCount()));
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
deleted file mode 100644
index 0b0fc013357..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific langu

(flink) 01/02: [hotfix][runtime] Adds ComponentMainThread to thread name of ComponentMainThreadExecutorServiceAdapter

2024-08-29 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e207a4598834b6665dbd12e6f964e924dfa67b1d
Author: Matthias Pohl 
AuthorDate: Thu Aug 1 08:43:56 2024 +0200

[hotfix][runtime] Adds ComponentMainThread to thread name of 
ComponentMainThreadExecutorServiceAdapter
---
 .../runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java| 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
index 55ac8bce611..690ccecf227 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java
@@ -78,6 +78,7 @@ public class ComponentMainThreadExecutorServiceAdapter 
implements ComponentMainT
 @Nonnull ScheduledExecutorService singleThreadExecutor) {
 final Thread thread =
 CompletableFuture.supplyAsync(Thread::currentThread, 
singleThreadExecutor).join();
+thread.setName(String.format("ComponentMainThread-%s", 
thread.getName()));
 return new 
ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread);
 }
 



(flink) 02/02: [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper lifecycle management (i.e. closing the scheduler before shutting down the main thread executor)

2024-08-29 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cff1da89cbd4f35b12c61a3e109db4a55682e44
Author: Matthias Pohl 
AuthorDate: Thu Aug 1 11:51:02 2024 +0200

[FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper 
lifecycle management (i.e. closing the scheduler before shutting down the main 
thread executor)
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 622 +++--
 1 file changed, 336 insertions(+), 286 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 70babe57449..ab0b96fa765 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -119,6 +119,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -188,12 +190,55 @@ public class AdaptiveSchedulerTest {
 
 private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
 
+private AdaptiveScheduler scheduler;
+
+@BeforeEach
+void before() {
+scheduler = null;
+}
+
+@AfterEach
+void after() {
+closeInExecutorService(scheduler, singleThreadMainThreadExecutor);
+}
+
+private static void closeInExecutorService(
+@Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor 
executor) {
+if (scheduler != null) {
+final CompletableFuture closeFuture = new 
CompletableFuture<>();
+executor.execute(
+() -> {
+try {
+// no matter what state the scheduler is in; we 
have to go to Finished
+// state to please the Preconditions of the close 
call
+if (scheduler.getState().getClass() != 
Finished.class) {
+scheduler.goToFinished(
+scheduler.getArchivedExecutionGraph(
+JobStatus.CANCELED, null));
+}
+FutureUtils.forward(scheduler.closeAsync(), 
closeFuture);
+} catch (Throwable t) {
+closeFuture.completeExceptionally(t);
+}
+});
+assertThatFuture(closeFuture).eventuallySucceeds();
+}
+}
+
+private void startTestInstanceInMainThread() {
+runInMainThread(() -> scheduler.startScheduling());
+}
+
+private void runInMainThread(Runnable callback) {
+CompletableFuture.runAsync(callback, 
singleThreadMainThreadExecutor).join();
+}
+
 @Test
 void testInitialState() throws Exception {
-final AdaptiveScheduler scheduler =
+scheduler =
 new AdaptiveSchedulerBuilder(
 createJobGraph(),
-mainThreadExecutor,
+singleThreadMainThreadExecutor,
 EXECUTOR_RESOURCE.getExecutor())
 .build();
 
@@ -206,12 +251,14 @@ public class AdaptiveSchedulerTest {
 jobGraph.setSnapshotSettings(
 new JobCheckpointingSettings(
 CheckpointCoordinatorConfiguration.builder().build(), 
null));
-
-final ArchivedExecutionGraph archivedExecutionGraph =
+scheduler =
 new AdaptiveSchedulerBuilder(
-jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-.build()
-.getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
+jobGraph,
+singleThreadMainThreadExecutor,
+EXECUTOR_RESOURCE.getExecutor())
+.build();
+final ArchivedExecutionGraph archivedExecutionGraph =
+scheduler.getArchivedExecutionGraph(JobStatus.INITIALIZING, 
null);
 
 
ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
 }
@@ -223,11 +270,14 @@ public class AdaptiveSchedulerTest {
 new JobCheckpointingSettings(
 CheckpointCoordinatorConfiguration.builder().bu

(flink) branch master updated (2ca359a140b -> 7cff1da89cb)

2024-08-29 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 2ca359a140b [FLINK-36116] Update Javadoc plugin. This closes #25265
 new e207a459883 [hotfix][runtime] Adds ComponentMainThread to thread name 
of ComponentMainThreadExecutorServiceAdapter
 new 7cff1da89cb [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to 
execute proper lifecycle management (i.e. closing the scheduler before shutting 
down the main thread executor)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ComponentMainThreadExecutorServiceAdapter.java |   1 +
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 622 +++--
 2 files changed, 337 insertions(+), 286 deletions(-)



(flink) branch master updated: [FLINK-33607][build] Updates Maven wrapper version and enables checksum verification for Maven wrapper

2024-08-14 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e9dd4683f75 [FLINK-33607][build] Updates Maven wrapper version and 
enables checksum verification for Maven wrapper
e9dd4683f75 is described below

commit e9dd4683f758b463d0b5ee18e49cecef6a70c5cf
Author: Luke Chen 
AuthorDate: Wed Aug 14 10:52:40 2024 +0800

[FLINK-33607][build] Updates Maven wrapper version and enables checksum 
verification for Maven wrapper
---
 .mvn/wrapper/maven-wrapper.properties |   5 +-
 mvnw  | 256 +++---
 mvnw.cmd  |  21 +--
 3 files changed, 153 insertions(+), 129 deletions(-)

diff --git a/.mvn/wrapper/maven-wrapper.properties 
b/.mvn/wrapper/maven-wrapper.properties
index 20a44b1df3f..0fe40ae8c5c 100644
--- a/.mvn/wrapper/maven-wrapper.properties
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -18,6 +18,5 @@
 # updating the Maven version requires updates to certain documentation and 
verification logic
 
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.6/apache-maven-3.8.6-bin.zip
 
distributionSha256Sum=ccf20a80e75a17ffc34d47c5c95c98c39d426ca17d670f09cd91e877072a9309
-wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
-# TODO FLINK-33607: checksum verification doesn't seem to work under windows
-# 
wrapperSha256Sum=e63a53cfb9c4d291ebe3c2b0edacb7622bbc480326beaa5a0456e412f52f066a
+wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.3.2/maven-wrapper-3.3.2.jar
+wrapperSha256Sum=3d8f20ce6103913be8b52aef6d994e0c54705fb527324ceb9b835b338739c7a8
diff --git a/mvnw b/mvnw
index 8d937f4c14f..5e9618cac26 100755
--- a/mvnw
+++ b/mvnw
@@ -19,7 +19,7 @@
 # 
 
 # 
-# Apache Maven Wrapper startup batch script, version 3.2.0
+# Apache Maven Wrapper startup batch script, version 3.3.2
 #
 # Required ENV vars:
 # --
@@ -33,75 +33,84 @@
 #   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
 # 
 
-if [ -z "$MAVEN_SKIP_RC" ] ; then
+if [ -z "$MAVEN_SKIP_RC" ]; then
 
-  if [ -f /usr/local/etc/mavenrc ] ; then
+  if [ -f /usr/local/etc/mavenrc ]; then
 . /usr/local/etc/mavenrc
   fi
 
-  if [ -f /etc/mavenrc ] ; then
+  if [ -f /etc/mavenrc ]; then
 . /etc/mavenrc
   fi
 
-  if [ -f "$HOME/.mavenrc" ] ; then
+  if [ -f "$HOME/.mavenrc" ]; then
 . "$HOME/.mavenrc"
   fi
 
 fi
 
 # OS specific support.  $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
+cygwin=false
+darwin=false
 mingw=false
 case "$(uname)" in
-  CYGWIN*) cygwin=true ;;
-  MINGW*) mingw=true;;
-  Darwin*) darwin=true
-# Use /usr/libexec/java_home if available, otherwise fall back to 
/Library/Java/Home
-# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
-if [ -z "$JAVA_HOME" ]; then
-  if [ -x "/usr/libexec/java_home" ]; then
-JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
-  else
-JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
-  fi
+CYGWIN*) cygwin=true ;;
+MINGW*) mingw=true ;;
+Darwin*)
+  darwin=true
+  # Use /usr/libexec/java_home if available, otherwise fall back to 
/Library/Java/Home
+  # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+  if [ -z "$JAVA_HOME" ]; then
+if [ -x "/usr/libexec/java_home" ]; then
+  JAVA_HOME="$(/usr/libexec/java_home)"
+  export JAVA_HOME
+else
+  JAVA_HOME="/Library/Java/Home"
+  export JAVA_HOME
 fi
-;;
+  fi
+  ;;
 esac
 
-if [ -z "$JAVA_HOME" ] ; then
-  if [ -r /etc/gentoo-release ] ; then
+if [ -z "$JAVA_HOME" ]; then
+  if [ -r /etc/gentoo-release ]; then
 JAVA_HOME=$(java-config --jre-home)
   fi
 fi
 
 # For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
-  [ -n "$JAVA_HOME" ] &&
-JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
-  [ -n "$CLASSPATH" ] &&
-CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+if $cygwin; then
+  [ -n "$JAVA_HOME" ] \
+&& JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] \
+&& CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
 fi
 
 # For Mingw, ensure paths are in UNIX format before anything is touched
-if $mingw ; then
-  [ -n "$JAVA_HO

(flink) branch master updated: [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1f03a0a62ec [FLINK-35553][runtime] Wire-up RescaleManager with 
CheckpointStatsListener in Executing state
1f03a0a62ec is described below

commit 1f03a0a62ec371d24b194759e6c95bf661501e07
Author: David Moravek 
AuthorDate: Thu Oct 26 12:52:04 2023 +0200

[FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener 
in Executing state
---
 .../generated/all_jobmanager_section.html  |   8 +-
 .../generated/expert_scheduling_section.html   |   8 +-
 .../generated/job_manager_configuration.html   |   8 +-
 .../flink/configuration/JobManagerOptions.java |  21 ++-
 .../checkpoint/CheckpointStatsListener.java|  33 +
 .../checkpoint/DefaultCheckpointStatsTracker.java  |  29 
 .../scheduler/adaptive/AdaptiveScheduler.java  | 148 +--
 .../scheduler/adaptive/DefaultRescaleManager.java  |  43 +++---
 .../runtime/scheduler/adaptive/Executing.java  |  57 +++-
 .../flink/runtime/scheduler/adaptive/State.java|  34 -
 .../DefaultCheckpointStatsTrackerTest.java |  67 +
 .../adaptive/AdaptiveSchedulerBuilder.java |  40 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 156 -
 .../adaptive/DefaultRescaleManagerTest.java|   3 +-
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 128 -
 .../test/scheduling/RescaleOnCheckpointITCase.java | 146 +++
 16 files changed, 864 insertions(+), 65 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 4539ce365d5..760ca029e04 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -12,7 +12,7 @@
 
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
 (none)
 Duration
-The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and the checkpointing interval multiplied by the by-1-incremented 
parameter value of 
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if 
checkpointing is enabled).
 
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
@@ -32,6 +32,12 @@
 Duration
 The maximum time the JobManager will wait to acquire all 
required resources after a job submission or restart. Once elapsed it will try 
to run the job with a lower parallelism, or fail if the minimum amount of 
resources could not be acquired.Increasing this value will make the 
cluster more resilient against temporary resources shortages (e.g., there is 
more time for a failed TaskManager to be restarted).Setting a negative 
duration will disable the resource tim [...]
 
+
+
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count
+2
+Integer
+The number of consecutive failed checkpoints that will trigger 
rescaling even in the absence of a completed checkpoint.
+
 
 
jobmanager.adaptive-scheduler.scaling-interval.max
 (none)
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 10e5ad134ce..6be6547547c 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -90,7 +90,7 @@
 
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
 (none)
 Duration
-The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and the checkpointing interval multiplied by the by-1-incremented 
parameter value of 
jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if 
checkpointing is enabled).
 
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
@@ -110,6 +110,12 @@
 Duration
 The maximum time the JobManager will wait to acquire all 
required resources after a job

(flink) 04/06: [hotfix][runtime] Make CheckpointStatsTracker not rely on numberOfTotalSubtasks

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91cf5d823ed6cb8c899e69187765dedc59c060c7
Author: Matthias Pohl 
AuthorDate: Fri May 31 15:58:34 2024 +0200

[hotfix][runtime] Make CheckpointStatsTracker not rely on 
numberOfTotalSubtasks

I double-checked that the JobInitializationMetricsBuilder constructor is 
called for every local restart of tasks and the 
JobInitializationMetricsBuilder#reportInitializationMetrics is called by the 
deployed Subtask after restoring its state. ExecutionAttemptID is correct 
because the builder creation happens again when a new Execution attempt is 
triggered (through the local restart). The totalNumberOfSubtasks was a bit 
misleading here, in my opinion.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 12 +++-
 .../checkpoint/CheckpointCoordinatorGateway.java   |  4 +-
 .../runtime/checkpoint/CheckpointStatsTracker.java | 41 
 .../JobInitializationMetricsBuilder.java   | 61 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  6 +-
 .../scheduler/DefaultExecutionGraphFactory.java| 20 +-
 .../runtime/scheduler/ExecutionGraphHandler.java   |  7 +-
 .../flink/runtime/scheduler/SchedulerBase.java |  7 +-
 .../flink/runtime/scheduler/SchedulerNG.java   |  4 +-
 .../scheduler/adaptive/AdaptiveScheduler.java  |  7 +-
 .../adaptive/StateWithExecutionGraph.java  |  7 +-
 .../flink/runtime/state/TaskStateManagerImpl.java  |  3 +-
 .../taskexecutor/rpc/RpcCheckpointResponder.java   |  7 +-
 .../runtime/taskmanager/CheckpointResponder.java   |  4 +-
 .../CheckpointCoordinatorFailureTest.java  |  6 +-
 .../CheckpointCoordinatorMasterHooksTest.java  |  5 +-
 .../checkpoint/CheckpointCoordinatorTest.java  | 23 ---
 .../CheckpointCoordinatorTestingUtils.java |  5 +-
 .../checkpoint/CheckpointStatsTrackerTest.java | 74 +-
 .../checkpoint/JobInitializationMetricsTest.java   | 22 +--
 .../flink/runtime/dispatcher/DispatcherTest.java   |  4 +-
 .../TestingDefaultExecutionGraphBuilder.java   |  8 ++-
 .../jobmaster/utils/TestingJobMasterGateway.java   |  4 +-
 .../OperatorCoordinatorHolderTest.java |  6 +-
 .../AbstractCheckpointStatsHandlerTest.java|  4 +-
 .../DefaultExecutionGraphFactoryTest.java  | 33 +-
 .../runtime/scheduler/TestingSchedulerNG.java  |  4 +-
 .../taskmanager/NoOpCheckpointResponder.java   |  4 +-
 .../taskmanager/TestCheckpointResponder.java   |  4 +-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  4 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |  4 +-
 .../util/CompletingCheckpointResponder.java|  4 +-
 32 files changed, 246 insertions(+), 162 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 369a3d6f177..7d318bb4cc9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toMap;
@@ -1760,7 +1761,14 @@ public class CheckpointCoordinator {
 throw new IllegalStateException("CheckpointCoordinator is shut 
down");
 }
 long restoreTimestamp = 
SystemClock.getInstance().absoluteTimeMillis();
-statsTracker.reportInitializationStartTs(restoreTimestamp);
+statsTracker.reportInitializationStarted(
+tasks.stream()
+.map(ExecutionJobVertex::getTaskVertices)
+.flatMap(Stream::of)
+.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getAttemptId)
+.collect(Collectors.toSet()),
+restoreTimestamp);
 
 // Restore from the latest checkpoint
 CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint();
@@ -2158,7 +2166,7 @@ public class CheckpointCoordinator {
 public void reportInitializationMetrics(
 ExecutionAttemptID executionAttemptID,
 SubTaskInitializationMetrics initializationMetrics) {
-statsTracker.reportInitializationMetrics(initializationMetrics);
+statsTracker.reportInitializationMetrics(executionAttemptID, 
initializat

(flink) 05/06: [hotfix][runtime] Extracts CheckpointStatsTracker into interface

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7c67d011dd38167336282280c97853f4e009104f
Author: Matthias Pohl 
AuthorDate: Thu Jun 20 16:33:30 2024 +0200

[hotfix][runtime] Extracts CheckpointStatsTracker into interface

- Introduces DefaultCheckpointStatsTracker
- Introduces NoOpCheckpointStatsTracker
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   7 +-
 .../runtime/checkpoint/CheckpointStatsTracker.java | 558 ++---
 ...ker.java => DefaultCheckpointStatsTracker.java} |  83 +--
 .../checkpoint/NoOpCheckpointStatsTracker.java |  81 +++
 .../scheduler/DefaultExecutionGraphFactory.java|   3 +-
 .../CheckpointCoordinatorFailureTest.java  |   2 +-
 .../CheckpointCoordinatorMasterHooksTest.java  |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java  |  14 +-
 .../CheckpointCoordinatorTestingUtils.java |   2 +-
 .../CheckpointCoordinatorTriggeringTest.java   |  35 ++
 ...java => DefaultCheckpointStatsTrackerTest.java} |  78 +--
 .../flink/runtime/dispatcher/DispatcherTest.java   |   3 +-
 .../TestingDefaultExecutionGraphBuilder.java   |   8 +-
 .../OperatorCoordinatorHolderTest.java |   7 +-
 .../AbstractCheckpointStatsHandlerTest.java|   3 +-
 .../DefaultExecutionGraphFactoryTest.java  |   2 +
 16 files changed, 238 insertions(+), 650 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 7d318bb4cc9..1606cad4126 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -2429,7 +2429,12 @@ public class CheckpointCoordinator {
 }
 
 private void reportFinishedTasks(
-PendingCheckpointStats pendingCheckpointStats, List 
finishedTasks) {
+@Nullable PendingCheckpointStats pendingCheckpointStats,
+List finishedTasks) {
+if (pendingCheckpointStats == null) {
+return;
+}
+
 long now = System.currentTimeMillis();
 finishedTasks.forEach(
 execution ->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 654bc8c3422..b620c0ce4c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -18,34 +18,16 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.traces.Span;
-import org.apache.flink.traces.SpanBuilder;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.StringWriter;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Tracker for checkpoint statistics.
@@ -63,141 +45,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * The statistics are accessed via {@link #createSnapshot()} and exposed 
via both the web
  * frontend and the {@link Metric} system.
  */
-public class CheckpointStatsTracker {
-
-private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointStatsTracker.class);
-private static final ObjectMapper MAPPER = 
RestMapperUtils.getStrictObjectMapper();
-
-/**
- * Lock used to update stats and creating snapshots. Updates always happen 
from a single Thread
- * at a time and there can be multiple concurrent read accesses to the 
latest stats snapshot.
- *
- * Currently, writes are executed by whatever Thread executes the 
coordinator actions (which
- * already happens in locked scope). Reads can come from multiple 
concurrent Netty e

(flink) 06/06: [FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed.

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 482969aa24df7633a0bbaac74c4d4f6cd6ab1333
Author: Matthias Pohl 
AuthorDate: Thu Jun 20 16:33:30 2024 +0200

[FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed.

The checkpoint tracker doesn't live in the DefaultExecutionGraphFactory 
anymore but is moved into the AdaptiveScheduler. This will allow the scheduler 
to react to checkpoint-related events.
---
 .../flink/util/function/CachingSupplier.java   | 42 ---
 .../flink/util/function/CachingSupplierTest.java   | 38 -
 .../executiongraph/DefaultExecutionGraph.java  |  6 ---
 .../DefaultExecutionGraphBuilder.java  |  5 +--
 .../runtime/executiongraph/ExecutionGraph.java |  3 --
 .../scheduler/DefaultExecutionGraphFactory.java| 14 +--
 .../runtime/scheduler/ExecutionGraphFactory.java   |  4 ++
 .../runtime/scheduler/ExecutionGraphHandler.java   | 25 +---
 .../flink/runtime/scheduler/SchedulerBase.java | 13 ++
 .../flink/runtime/scheduler/SchedulerUtils.java| 12 ++
 .../scheduler/adaptive/AdaptiveScheduler.java  | 11 +
 .../TestingDefaultExecutionGraphBuilder.java   | 19 -
 .../DefaultExecutionGraphFactoryTest.java  | 47 +-
 .../adaptive/StateTrackingMockExecutionGraph.java  |  6 ---
 14 files changed, 100 insertions(+), 145 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java 
b/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java
deleted file mode 100644
index d1bfce1fe53..000
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.util.function.Supplier;
-
-/** A {@link Supplier} that returns a single, lazily instantiated, value. */
-@NotThreadSafe
-public class CachingSupplier implements Supplier {
-private final Supplier backingSupplier;
-private @Nullable T cachedValue;
-
-public CachingSupplier(Supplier backingSupplier) {
-this.backingSupplier = backingSupplier;
-}
-
-@Override
-public T get() {
-if (cachedValue == null) {
-cachedValue = backingSupplier.get();
-}
-return cachedValue;
-}
-}
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java
deleted file mode 100644
index c2ed0d7018c..000
--- 
a/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.flink.util.function;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class CachingSupplierTest {
-
-@Test
-void testCaching() {
-final AtomicInteger instantiationCounts = new AtomicInteg

(flink) 03/06: [hotfix][runtime] Adds missing @Nullable annotation

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 446b582ed701e8c6f9f2c98aaacab1ea7ee0b64e
Author: Matthias Pohl 
AuthorDate: Fri May 31 16:16:39 2024 +0200

[hotfix][runtime] Adds missing @Nullable annotation
---
 .../flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index e9b1317e46e..44fa1a9b68b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -67,7 +67,7 @@ public class CreatingExecutionGraph extends 
StateWithoutExecutionGraph {
 executionGraphWithParallelismFuture,
 Logger logger,
 OperatorCoordinatorHandlerFactory operatorCoordinatorFactory,
-ExecutionGraph previousExecutionGraph1) {
+@Nullable ExecutionGraph previousExecutionGraph) {
 super(context, logger);
 this.context = context;
 this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory;
@@ -83,7 +83,7 @@ public class CreatingExecutionGraph extends 
StateWithoutExecutionGraph {
 Duration.ZERO);
 return null;
 }));
-previousExecutionGraph = previousExecutionGraph1;
+this.previousExecutionGraph = previousExecutionGraph;
 }
 
 private void handleExecutionGraphCreation(



(flink) 02/06: [hotfix][runtime] Remove no longer used CheckpointCoordinator#failUnacknowledgedPendingCheckpointsFor method.

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 097fc7d470ee8ab37440a96ca0d9a5eb721f5204
Author: David Moravek 
AuthorDate: Thu Sep 28 11:15:37 2023 -0700

[hotfix][runtime] Remove no longer used 
CheckpointCoordinator#failUnacknowledgedPendingCheckpointsFor method.
---
 .../flink/runtime/checkpoint/CheckpointCoordinator.java  | 16 
 1 file changed, 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c05efb10b48..369a3d6f177 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1625,22 +1625,6 @@ public class CheckpointCoordinator {
 }
 }
 
-/**
- * Fails all pending checkpoints which have not been acknowledged by the 
given execution attempt
- * id.
- *
- * @param executionAttemptId for which to discard unacknowledged pending 
checkpoints
- * @param cause of the failure
- */
-public void failUnacknowledgedPendingCheckpointsFor(
-ExecutionAttemptID executionAttemptId, Throwable cause) {
-synchronized (lock) {
-abortPendingCheckpoints(
-checkpoint -> 
!checkpoint.isAcknowledgedBy(executionAttemptId),
-new 
CheckpointException(CheckpointFailureReason.TASK_FAILURE, cause));
-}
-}
-
 private void rememberRecentExpiredCheckpointId(long id) {
 if (recentExpiredCheckpoints.size() >= NUM_GHOST_CHECKPOINT_IDS) {
 recentExpiredCheckpoints.removeFirst();



(flink) 01/06: [hotfix][runtime] Removes unused method

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be938287bfa45538bdcaf0491ef9583841e3447f
Author: Matthias Pohl 
AuthorDate: Fri May 31 14:43:43 2024 +0200

[hotfix][runtime] Removes unused method
---
 .../org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java  | 5 -
 1 file changed, 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index ea04211d6f0..fbdf97bd408 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -146,11 +146,6 @@ public class CheckpointStatsTracker {
 return this;
 }
 
-@VisibleForTesting
-Optional 
getJobInitializationMetricsBuilder() {
-return jobInitializationMetricsBuilder;
-}
-
 /**
  * Creates a new snapshot of the available stats.
  *



(flink) branch master updated (0b60bba6715 -> 482969aa24d)

2024-07-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 0b60bba6715 Revert "[FLINK-33211][table] support flink table lineage 
(#24618)"
 new be938287bfa [hotfix][runtime] Removes unused method
 new 097fc7d470e [hotfix][runtime] Remove no longer used 
CheckpointCoordinator#failUnacknowledgedPendingCheckpointsFor method.
 new 446b582ed70 [hotfix][runtime] Adds missing @Nullable annotation
 new 91cf5d823ed [hotfix][runtime] Make CheckpointStatsTracker not rely on 
numberOfTotalSubtasks
 new 7c67d011dd3 [hotfix][runtime] Extracts CheckpointStatsTracker into 
interface
 new 482969aa24d [FLINK-35552][runtime] Rework how CheckpointStatsTracker 
is constructed.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/util/function/CachingSupplier.java   |  42 --
 .../flink/util/function/CachingSupplierTest.java   |  38 --
 .../runtime/checkpoint/CheckpointCoordinator.java  |  35 +-
 .../checkpoint/CheckpointCoordinatorGateway.java   |   4 +-
 .../runtime/checkpoint/CheckpointStatsTracker.java | 580 ++---
 ...ker.java => DefaultCheckpointStatsTracker.java} | 129 ++---
 .../JobInitializationMetricsBuilder.java   |  61 ++-
 .../checkpoint/NoOpCheckpointStatsTracker.java |  81 +++
 .../executiongraph/DefaultExecutionGraph.java  |   6 -
 .../DefaultExecutionGraphBuilder.java  |   5 +-
 .../runtime/executiongraph/ExecutionGraph.java |   3 -
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   6 +-
 .../scheduler/DefaultExecutionGraphFactory.java|  29 +-
 .../runtime/scheduler/ExecutionGraphFactory.java   |   4 +
 .../runtime/scheduler/ExecutionGraphHandler.java   |  28 +-
 .../flink/runtime/scheduler/SchedulerBase.java |  20 +-
 .../flink/runtime/scheduler/SchedulerNG.java   |   4 +-
 .../flink/runtime/scheduler/SchedulerUtils.java|  12 +
 .../scheduler/adaptive/AdaptiveScheduler.java  |  18 +-
 .../scheduler/adaptive/CreatingExecutionGraph.java |   4 +-
 .../adaptive/StateWithExecutionGraph.java  |   7 +-
 .../flink/runtime/state/TaskStateManagerImpl.java  |   3 +-
 .../taskexecutor/rpc/RpcCheckpointResponder.java   |   7 +-
 .../runtime/taskmanager/CheckpointResponder.java   |   4 +-
 .../CheckpointCoordinatorFailureTest.java  |   8 +-
 .../CheckpointCoordinatorMasterHooksTest.java  |   5 +-
 .../checkpoint/CheckpointCoordinatorTest.java  |  35 +-
 .../CheckpointCoordinatorTestingUtils.java |   5 +-
 .../CheckpointCoordinatorTriggeringTest.java   |  35 ++
 ...java => DefaultCheckpointStatsTrackerTest.java} | 138 +++--
 .../checkpoint/JobInitializationMetricsTest.java   |  22 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |   7 +-
 .../TestingDefaultExecutionGraphBuilder.java   |  21 +-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   4 +-
 .../OperatorCoordinatorHolderTest.java |   5 +-
 .../AbstractCheckpointStatsHandlerTest.java|   7 +-
 .../DefaultExecutionGraphFactoryTest.java  |  66 ++-
 .../runtime/scheduler/TestingSchedulerNG.java  |   4 +-
 .../adaptive/StateTrackingMockExecutionGraph.java  |   6 -
 .../taskmanager/NoOpCheckpointResponder.java   |   4 +-
 .../taskmanager/TestCheckpointResponder.java   |   4 +-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |   4 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |   4 +-
 .../util/CompletingCheckpointResponder.java|   4 +-
 44 files changed, 552 insertions(+), 966 deletions(-)
 delete mode 100644 
flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java
 delete mode 100644 
flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/{CheckpointStatsTracker.java
 => DefaultCheckpointStatsTracker.java} (83%)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/NoOpCheckpointStatsTracker.java
 rename 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/{CheckpointStatsTrackerTest.java
 => DefaultCheckpointStatsTrackerTest.java} (82%)



(flink) branch master updated: Revert "[FLINK-33211][table] support flink table lineage (#24618)"

2024-07-02 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 0b60bba6715 Revert "[FLINK-33211][table] support flink table lineage 
(#24618)"
0b60bba6715 is described below

commit 0b60bba6715991d3bc9332956ca6abfef983ca79
Author: Matthias Pohl 
AuthorDate: Wed Jul 3 08:34:07 2024 +0200

Revert "[FLINK-33211][table] support flink table lineage (#24618)"

This reverts commit 960363cd3c6c82f7e56ef781295756105f7b5eba.
---
 .../api/functions/source/FromElementsFunction.java |  30 +
 .../flink/streaming/api/graph/StreamGraph.java |   9 --
 .../streaming/api/graph/StreamGraphGenerator.java  |   6 +-
 .../api/lineage/DefaultLineageDataset.java |  54 
 .../streaming/api/lineage/DefaultLineageEdge.java  |  46 ---
 .../streaming/api/lineage/DefaultLineageGraph.java |   8 +-
 .../flink/streaming/api/lineage/LineageGraph.java  |   5 +-
 .../streaming/api/lineage/LineageGraphUtils.java   |  86 
 .../transformations/LegacySinkTransformation.java  |   2 +-
 .../LegacySourceTransformation.java|   2 +-
 .../api/transformations/SinkTransformation.java|   2 +-
 .../api/transformations/SourceTransformation.java  |   2 +-
 .../transformations/TransformationWithLineage.java |  75 ---
 .../translators/SinkTransformationTranslator.java  |   2 +-
 .../apache/flink/table/operations/ModifyType.java  |  31 -
 .../table/operations/SinkModifyOperation.java  |  10 ++
 flink-table/flink-table-planner/pom.xml|   9 +-
 .../table/planner/lineage/TableLineageDataset.java |  37 -
 .../planner/lineage/TableLineageDatasetImpl.java   | 100 --
 .../table/planner/lineage/TableLineageUtils.java   |  93 -
 .../planner/lineage/TableSinkLineageVertex.java|  34 -
 .../lineage/TableSinkLineageVertexImpl.java|  47 ---
 .../planner/lineage/TableSourceLineageVertex.java  |  26 
 .../lineage/TableSourceLineageVertexImpl.java  |  47 ---
 .../operations/SqlNodeToOperationConversion.java   |   8 +-
 .../plan/nodes/exec/common/CommonExecSink.java |  54 ++--
 .../exec/common/CommonExecTableSourceScan.java |  61 ++---
 .../flink/connector/source/ValuesSource.java   |  44 +-
 .../factories/TestValuesRuntimeFunctions.java  |  96 ++---
 .../plan/batch/sql/TableLineageGraphTest.java  |  37 -
 .../plan/common/TableLineageGraphTestBase.java | 150 -
 .../plan/nodes/exec/TransformationsTest.java   |  20 ---
 .../plan/stream/sql/TableLineageGraphTest.java |  37 -
 .../test/resources/lineage-graph/query-batch.json  |  70 --
 .../test/resources/lineage-graph/query-stream.json |  70 --
 .../test/resources/lineage-graph/union-batch.json  | 117 
 .../test/resources/lineage-graph/union-stream.json | 117 
 .../flink/table/planner/utils/TableTestBase.scala  |  17 ---
 .../operators/values/ValuesInputFormat.java|  29 +---
 pom.xml|   1 -
 40 files changed, 60 insertions(+), 1631 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index c183b6494e2..aff13245afa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -25,18 +25,12 @@ import 
org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
-import org.apache.flink.streaming.api.lineage.LineageDataset;
-import org.apache.flink.streaming.api.lineage.LineageVertex;
-import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
-import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.util.It

(flink) 01/02: [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow

2024-07-02 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab856fc6c8b1cdd402c11f7755b01a51aafd8f4c
Author: morazow 
AuthorDate: Fri Mar 1 11:38:36 2024 +0100

[FLINK-34487][ci] Adds Python Wheels nightly GHA workflow

Co-authored-by: Matthias Pohl 
---
 .github/actions/stringify/action.yml| 42 +
 .github/workflows/nightly.yml   | 48 +
 .github/workflows/template.flink-ci.yml | 15 ---
 3 files changed, 95 insertions(+), 10 deletions(-)

diff --git a/.github/actions/stringify/action.yml 
b/.github/actions/stringify/action.yml
new file mode 100644
index 000..e05547f78dd
--- /dev/null
+++ b/.github/actions/stringify/action.yml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "Stringify"
+description: "Stringifies the given input value."
+inputs:
+  value:
+description: "Input value to stringify."
+required: true
+outputs:
+  stringified_value:
+description: "Stringified output value."
+value: ${{ steps.stringify-step.outputs.stringified_value }}
+runs:
+  using: "composite"
+  steps:
+- name: "Stringify '${{ inputs.value }}'"
+  id: stringify-step
+  shell: bash
+  run: |
+# adds a stringified version of the workflow name that can be used for 
generating unique build artifact names within a composite workflow
+# - replaces any special characters (except for underscores and dots) 
with dashes
+# - makes the entire string lowercase
+# - condenses multiple dashes into a single one
+# - removes leading and following dashes
+stringified_value=$(echo "${{ inputs.value }}" | tr -C '[:alnum:]._' 
'-' |  tr '[:upper:]' '[:lower:]' | sed -e 's/--*/-/g' -e 's/^-*//g' -e 
's/-*$//g')
+echo "stringified_value=${stringified_value}" >> $GITHUB_OUTPUT
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 2090a3b1156..67994be49a5 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -94,3 +94,51 @@ jobs:
   s3_bucket: ${{ secrets.IT_CASE_S3_BUCKET }}
   s3_access_key: ${{ secrets.IT_CASE_S3_ACCESS_KEY }}
   s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }}
+
+  build_python_wheels:
+name: "Build Python Wheels on ${{ matrix.os }}"
+runs-on: ${{ matrix.os }}
+strategy:
+  fail-fast: false
+  matrix:
+include:
+  - os: ubuntu-latest
+os_name: linux
+python-version: 3.9
+  - os: macos-latest
+os_name: macos
+python-version: 3.9
+steps:
+  - name: "Checkout the repository"
+uses: actions/checkout@v4
+with:
+  fetch-depth: 0
+  persist-credentials: false
+  - name: "Install python"
+uses: actions/setup-python@v5
+with:
+  python-version: ${{ matrix.python-version }}
+  - name: "Stringify workflow name"
+uses: "./.github/actions/stringify"
+id: stringify_workflow
+with:
+  value: ${{ github.workflow }}
+  - name: "Build python wheels for ${{ matrix.os_name }}"
+if: matrix.os_name == 'linux'
+run: |
+  cd flink-python
+  bash dev/build-wheels.sh
+  - name: "Build python wheels for ${{ matrix.os }}"
+if: matrix.os_name == 'macos'
+run: |
+  cd flink-python
+  python -m pip install --upgrade pip
+  pip install cibuildwheel==2.8.0
+  cibuildwheel --platform macos --output-dir dist .
+  - name: "Tar python artifact"
+run: tar -czvf python-wheel-${{ matrix.os_name }}.tar.gz -C 
flink-python/dist .
+  - name: "Upload python artifact"
+uses: actions/upload-artifact@v4
+with:
+  name: wheel_${

(flink) 02/02: [FLINK-34582] Updates cibuildwheel to support cpython 3.11 wheel package

2024-07-02 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c75248e48f41d227be934c9754f7dd3f3b571e37
Author: morazow 
AuthorDate: Thu May 23 16:51:10 2024 +0200

[FLINK-34582] Updates cibuildwheel to support cpython 3.11 wheel package
---
 .github/workflows/nightly.yml | 39 +--
 flink-python/pyproject.toml   |  3 +++
 2 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 67994be49a5..adbdf660888 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -96,7 +96,7 @@ jobs:
   s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }}
 
   build_python_wheels:
-name: "Build Python Wheels on ${{ matrix.os }}"
+name: "Build Python Wheels on ${{ matrix.os_name }}"
 runs-on: ${{ matrix.os }}
 strategy:
   fail-fast: false
@@ -104,37 +104,40 @@ jobs:
 include:
   - os: ubuntu-latest
 os_name: linux
-python-version: 3.9
   - os: macos-latest
 os_name: macos
-python-version: 3.9
 steps:
   - name: "Checkout the repository"
 uses: actions/checkout@v4
 with:
   fetch-depth: 0
   persist-credentials: false
-  - name: "Install python"
-uses: actions/setup-python@v5
-with:
-  python-version: ${{ matrix.python-version }}
   - name: "Stringify workflow name"
 uses: "./.github/actions/stringify"
 id: stringify_workflow
 with:
   value: ${{ github.workflow }}
+  # Used to host cibuildwheel
+  - name: "Setup Python"
+uses: actions/setup-python@v5
+with:
+  python-version: '3.x'
+  - name: "Install cibuildwheel"
+run: python -m pip install cibuildwheel==2.16.5
   - name: "Build python wheels for ${{ matrix.os_name }}"
-if: matrix.os_name == 'linux'
-run: |
-  cd flink-python
-  bash dev/build-wheels.sh
-  - name: "Build python wheels for ${{ matrix.os }}"
-if: matrix.os_name == 'macos'
-run: |
-  cd flink-python
-  python -m pip install --upgrade pip
-  pip install cibuildwheel==2.8.0
-  cibuildwheel --platform macos --output-dir dist .
+run: python -m cibuildwheel --output-dir flink-python/dist flink-python
+env:
+  # Skip -musllinux on Linux builds
+  CIBW_SKIP: "*-musllinux*"
+  # Use manylinux2014 on Linux
+  CIBW_MANYLINUX_X86_64_IMAGE: manylinux2014
+  CIBW_BEFORE_ALL_LINUX: pip install patchelf
+  CIBW_BEFORE_BUILD_LINUX: pip install -r 
flink-python/dev/dev-requirements.txt
+  CIBW_ENVIRONMENT_LINUX: CFLAGS="-I. -include 
./dev/glibc_version_fix.h"
+  # Run auditwheel repair on Linux
+  CIBW_REPAIR_WHEEL_COMMAND_LINUX: "auditwheel repair -w {dest_dir} 
{wheel}"
+  # Skip repair on MacOS
+  CIBW_REPAIR_WHEEL_COMMAND_MACOS: ""
   - name: "Tar python artifact"
 run: tar -czvf python-wheel-${{ matrix.os_name }}.tar.gz -C 
flink-python/dist .
   - name: "Upload python artifact"
diff --git a/flink-python/pyproject.toml b/flink-python/pyproject.toml
index c21b2fccd5e..0ec9012fbc8 100644
--- a/flink-python/pyproject.toml
+++ b/flink-python/pyproject.toml
@@ -31,3 +31,6 @@ build = ["cp38-*", "cp39-*", "cp310-*", "cp311-*"]
 
 [tool.cibuildwheel.macos]
 archs = ["x86_64", "arm64"]
+
+[tool.cibuildwheel.linux]
+archs = ["x86_64"]



(flink) branch master updated (e56b54db40a -> c75248e48f4)

2024-07-02 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from e56b54db40a [FLINK-35625][cli] Merge "flink run" and "flink 
run-application" functionality, deprecate "run-application"
 new ab856fc6c8b [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow
 new c75248e48f4 [FLINK-34582] Updates cibuildwheel to support cpython 3.11 
wheel package

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/actions/stringify/action.yml| 42 +++
 .github/workflows/nightly.yml   | 51 +
 .github/workflows/template.flink-ci.yml | 15 --
 flink-python/pyproject.toml |  3 ++
 4 files changed, 101 insertions(+), 10 deletions(-)
 create mode 100644 .github/actions/stringify/action.yml



(flink) branch master updated: [FLINK-35551][runtime] Adds RescaleManager#onTrigger

2024-06-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 14140db6a46 [FLINK-35551][runtime] Adds RescaleManager#onTrigger
14140db6a46 is described below

commit 14140db6a4682384a8178e4eb2885103706abb79
Author: Matthias Pohl 
AuthorDate: Wed May 29 16:19:36 2024 +0200

[FLINK-35551][runtime] Adds RescaleManager#onTrigger

- integrates trigger logic in DefaultRescaleManager
- a new parameter jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
---
 .../generated/all_jobmanager_section.html  |   6 +
 .../generated/expert_scheduling_section.html   |   6 +
 .../generated/job_manager_configuration.html   |   6 +
 .../flink/configuration/JobManagerOptions.java |  19 +++
 .../scheduler/adaptive/AdaptiveScheduler.java  |  31 -
 .../adaptive/AdaptiveSchedulerFactory.java |   3 +-
 .../scheduler/adaptive/DefaultRescaleManager.java  |  88 -
 .../runtime/scheduler/adaptive/Executing.java  |   9 +-
 .../runtime/scheduler/adaptive/RescaleManager.java |   6 +
 .../adaptive/DefaultRescaleManagerTest.java| 145 +++--
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  16 ++-
 .../scheduler/adaptive/TestingRescaleManager.java  |  17 ++-
 12 files changed, 321 insertions(+), 31 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html 
b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 626aa50f386..4539ce365d5 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -8,6 +8,12 @@
 
 
 
+
+
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
+(none)
+Duration
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
 1
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html 
b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 86682f4fbfd..10e5ad134ce 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -86,6 +86,12 @@
 MemorySize
 The size of the write buffer of JobEventStore. The content 
will be flushed to external file system once the buffer is full
 
+
+
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
+(none)
+Duration
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
 1
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html 
b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index c01601a031a..df84946a5b0 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -8,6 +8,12 @@
 
 
 
+
+
jobmanager.adaptive-scheduler.max-delay-for-scale-trigger
+(none)
+Duration
+The maximum time the JobManager will wait with evaluating 
previously observed events for rescaling (default: 0ms if checkpointing is 
disabled and %dx of the checkpointing interval if checkpointing is 
enabled).
+
 
 
jobmanager.adaptive-scheduler.min-parallelism-increase
 1
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 77bfce2a9f8..ab95a0e2669 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -39,6 +39,7 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
 public class JobManagerOptions {
 
 public static final MemorySize MIN_JVM_HEAP_SIZE = 
MemorySize.ofMebiBytes(128);
+public static final int 
FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER = 3;
 
 /**
  * The config parameter defining the network address to connect to for 
communication with the
@@ -573,6 +574,24 @@ public class JobManagerOptions {
 
code

(flink) branch master updated (7f1399561b3 -> ac4a275f0fe)

2024-06-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 7f1399561b3 [FLINK-35550][runtime] Adds new component RescaleManager 
that handles the rescaling logic to improve code testability and extensibility
 add c534e88bf12 [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e 
tests framework
 add ac4a275f0fe [FLINK-20398][test] solve deprecation in BatchSQLTest 
source table creation

No new revisions were added by this update.

Summary of changes:
 .../flink-batch-sql-test/pom.xml   |  69 
 .../flink/sql/tests/BatchSQLTestProgram.java   | 188 -
 .../org/apache/flink/sql/tests/BatchSQLTest.java   | 154 +
 .../java/org/apache/flink/sql/tests/Generator.java |  82 +
 .../flink/sql/tests/GeneratorTableSource.java  |  53 ++
 .../sql/tests/GeneratorTableSourceFactory.java |  86 ++
 .../org.apache.flink.table.factories.Factory   |  16 ++
 .../src/test/resources/log4j2-test.properties  |  33 
 .../src/test/resources/sql-job-query.sql   |  48 ++
 flink-end-to-end-tests/run-nightly-tests.sh|   3 -
 .../test-scripts/test_batch_sql.sh |  82 -
 .../testframe/source/FromElementsSource.java   |  42 -
 .../testframe/source/FromElementsSourceReader.java |  15 +-
 13 files changed, 555 insertions(+), 316 deletions(-)
 delete mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratorTableSource.java
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratorTableSourceFactory.java
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties
 create mode 100644 
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/sql-job-query.sql
 delete mode 100755 flink-end-to-end-tests/test-scripts/test_batch_sql.sh



(flink) branch master updated: [FLINK-35550][runtime] Adds new component RescaleManager that handles the rescaling logic to improve code testability and extensibility

2024-06-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7f1399561b3 [FLINK-35550][runtime] Adds new component RescaleManager 
that handles the rescaling logic to improve code testability and extensibility
7f1399561b3 is described below

commit 7f1399561b3ff303ea48ec0b93eef79eaaf3e4c8
Author: Matthias Pohl 
AuthorDate: Thu May 23 16:56:56 2024 +0200

[FLINK-35550][runtime] Adds new component RescaleManager that handles the 
rescaling logic to improve code testability and extensibility

Rescaling is a state-specific functionality. Moving all the logic into 
Executing state allows
us to align the resource controlling in Executing state and 
WaitingForResources state in a future effort.
---
 .../scheduler/adaptive/AdaptiveScheduler.java  |  64 +--
 .../scheduler/adaptive/DefaultRescaleManager.java  | 167 +++
 .../runtime/scheduler/adaptive/Executing.java  | 175 +++
 .../runtime/scheduler/adaptive/RescaleManager.java |  64 +++
 .../EnforceMinimalIncreaseRescalingController.java |   7 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  46 +-
 .../adaptive/DefaultRescaleManagerTest.java| 553 +
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 181 +--
 .../scheduler/adaptive/TestingRescaleManager.java  |  53 ++
 ...orceMinimalIncreaseRescalingControllerTest.java |  12 +-
 10 files changed, 971 insertions(+), 351 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 7a8f3ae6570..c5310de8b5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -57,7 +57,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore;
@@ -110,9 +109,6 @@ import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
-import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
-import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController;
-import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
@@ -147,8 +143,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
+import static 
org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
 
 /**
@@ -228,7 +224,8 @@ public class AdaptiveScheduler
 .orElse(stabilizationTimeoutDefault),
 configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT),
 scalingIntervalMin,
-scalingIntervalMax);
+scalingIntervalMax,
+configuration.get(MIN_PARALLELISM_INCREASE));
 }
 
 private final SchedulerExecutionMode executionMode;
@@ -237,6 +234,7 @@ public class AdaptiveScheduler
 private final Duration slotIdleTimeout;
 private final Duration scalingIntervalMin;
 private final Duration scalingIntervalMax;
+private final int minParallelismChangeForDesiredRescale;
 
 private Settings(
 SchedulerExecutionMode executionMode,
@@ -244,13 +242,15 @@ public class AdaptiveScheduler
 Duration resourceStabilizationTimeout,
 Duration slotIdleTimeout,
 Duration scalingIntervalMin,
-Duration

(flink) branch master updated: Fix artifactId of flink-migration-test-utils module

2024-06-18 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new fdd0f80bf70 Fix artifactId of flink-migration-test-utils module
fdd0f80bf70 is described below

commit fdd0f80bf701cd868c6a650beb1742ebb6bef814
Author: wforget <643348...@qq.com>
AuthorDate: Thu Jun 6 15:59:19 2024 +0800

Fix artifactId of flink-migration-test-utils module
---
 flink-connectors/flink-hadoop-compatibility/pom.xml  | 2 +-
 flink-core/pom.xml   | 2 +-
 flink-formats/flink-avro/pom.xml | 2 +-
 flink-fs-tests/pom.xml   | 2 +-
 flink-libraries/flink-cep/pom.xml| 2 +-
 flink-runtime/pom.xml| 2 +-
 flink-scala/pom.xml  | 2 +-
 flink-streaming-java/pom.xml | 2 +-
 flink-table/flink-table-runtime/pom.xml  | 2 +-
 flink-test-utils-parent/flink-migration-test-utils/README.md | 4 ++--
 flink-test-utils-parent/flink-migration-test-utils/pom.xml   | 2 +-
 flink-tests-java17/pom.xml   | 2 +-
 flink-tests/pom.xml  | 2 +-
 13 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml 
b/flink-connectors/flink-hadoop-compatibility/pom.xml
index eb9ca61b47f..6b6307688b5 100644
--- a/flink-connectors/flink-hadoop-compatibility/pom.xml
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -110,7 +110,7 @@ under the License.
 

org.apache.flink
-   fink-migration-test-utils
+   flink-migration-test-utils
${project.version}
test

diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 4521e7464da..5525a986b80 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -175,7 +175,7 @@ under the License.
 

org.apache.flink
-   fink-migration-test-utils
+   flink-migration-test-utils
${project.version}
test

diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 4121f4c500d..d33a2726b19 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -199,7 +199,7 @@ under the License.
 

org.apache.flink
-   fink-migration-test-utils
+   flink-migration-test-utils
${project.version}
test

diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml
index 88157ae4257..83f84abde41 100644
--- a/flink-fs-tests/pom.xml
+++ b/flink-fs-tests/pom.xml
@@ -126,7 +126,7 @@ under the License.
 

org.apache.flink
-   fink-migration-test-utils
+   flink-migration-test-utils
${project.version}
test

diff --git a/flink-libraries/flink-cep/pom.xml 
b/flink-libraries/flink-cep/pom.xml
index 93ffdabe6ae..5cc14c45039 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -105,7 +105,7 @@ under the License.
 
 
 org.apache.flink
-fink-migration-test-utils
+flink-migration-test-utils
 ${project.version}
 test
 
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 6008fe1717e..6f7df598e9d 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -287,7 +287,7 @@ under the License.
 

org.apache.flink
-   fink-migration-test-utils
+   flink-migration-test-utils
${project.version}
test

diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 8f85d049aa1..f0a4791af42 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -127,7 +127,7 @@ under the License.
 

org.apache.flink
-   fink-migration-test-utils
+   flink-migration-test-utils
${project.version}
test

diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 946289c1e5e..548aa0f401c 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -119,7 +119,7 @@ under the License.
 

org.apache

(flink) branch FLINK-35550 deleted (was 04e7179738b)

2024-06-18 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch FLINK-35550
in repository https://gitbox.apache.org/repos/asf/flink.git


 was 04e7179738b JavaDoc fix

This change permanently discards the following revisions:

 discard 04e7179738b JavaDoc fix



svn commit: r69740 - /dev/flink/flink-1.19.1-rc1/ /release/flink/flink-1.19.1/

2024-06-14 Thread mapohl
Author: mapohl
Date: Fri Jun 14 14:53:55 2024
New Revision: 69740

Log:
Release Flink 1.19.1

Added:
release/flink/flink-1.19.1/
  - copied from r69739, dev/flink/flink-1.19.1-rc1/
Removed:
dev/flink/flink-1.19.1-rc1/



(flink) 01/01: JavaDoc fix

2024-06-11 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch FLINK-35550
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04e7179738b5dfea6a6929ce74b08306e1f6e438
Author: Matthias Pohl 
AuthorDate: Tue Jun 11 15:35:19 2024 +0200

JavaDoc fix
---
 .../flink/runtime/scheduler/adaptive/DefaultRescaleManager.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
index 1a35734a02c..3aac9d9385d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java
@@ -36,9 +36,9 @@ import java.time.temporal.Temporal;
 import java.util.function.Supplier;
 
 /**
- * {@code DefaultRescaleManager} manages the rescaling in depending on time of 
the previous rescale
- * operation and the available resources. It handles the event based on the 
following phases (in
- * that order):
+ * {@code DefaultRescaleManager} manages triggering the next rescaling based 
on when the previous
+ * rescale operation happened and the available resources. It handles the 
event based on the
+ * following phases (in that order):
  *
  * 
  *   Cooldown phase: No rescaling takes place (its upper threshold is 
defined by {@code



(flink) branch FLINK-35550 created (now 04e7179738b)

2024-06-11 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch FLINK-35550
in repository https://gitbox.apache.org/repos/asf/flink.git


  at 04e7179738b JavaDoc fix

This branch includes the following new commits:

 new 04e7179738b JavaDoc fix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(flink) 02/04: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d98ab060be82fe3684d15501b9eb83373303d18
Author: Matthias Pohl 
AuthorDate: Wed Jan 31 15:02:24 2024 +0100

[FLINK-34324][test] Makes all s3 related operations being declared and 
called in a single location
---
 .../test-scripts/test_file_sink.sh | 117 +++--
 1 file changed, 62 insertions(+), 55 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 711f74b6672..7b7728b44f8 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,89 +20,96 @@
 OUT_TYPE="${1:-local}" # other type: s3
 SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
-OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
-S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
 
-if [ "${OUT_TYPE}" == "s3" ]; then
-  source "$(dirname "$0")"/common_s3.sh
-else
-  echo "S3 environment is not loaded for non-s3 test runs (test run type: 
$OUT_TYPE)."
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
-
-s3_setup hadoop
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
-mkdir -p $OUTPUT_PATH
-
-if [ "${OUT_TYPE}" == "local" ]; then
-  echo "Use local output"
-  JOB_OUTPUT_PATH=${OUTPUT_PATH}
-elif [ "${OUT_TYPE}" == "s3" ]; then
-  echo "Use s3 output"
-  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
-  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
-  mkdir -p "$OUTPUT_PATH-chk"
-else
-  echo "Unknown output type: ${OUT_TYPE}"
-  exit 1
-fi
-
-# make sure we delete the file at the end
-function out_cleanup {
-  s3_delete_by_full_path_prefix "$S3_PREFIX"
-  s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
-  rollback_openssl_lib
-}
-if [ "${OUT_TYPE}" == "s3" ]; then
-  on_exit out_cleanup
-fi
+# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download 
folder for remote data
+# the helper functions will access this folder
+RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
+LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
+mkdir -p "${LOCAL_JOB_OUTPUT_PATH}"
 
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+# JOB_OUTPUT_PATH is the location where the job writes its data to
+JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}"
 
 ###
 # Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   OUTPUT_PATH
+#   LOCAL_JOB_OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
-  find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
+  find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + 
| sort -g
 }
 
 ###
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   LOCAL_JOB_OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  # the s3 context requires additional
+  source "$(dirname "$0")"/common_s3.sh
+  s3_setup hadoop

(flink) branch release-1.18 updated (9d0858ee745 -> 09f7b070989)

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


from 9d0858ee745 [FLINK-34379][table] Fix OutOfMemoryError with large 
queries
 new 6f486e3d97d [hotfix][ci] Docker might return multiple port bindings
 new 7d98ab060be [FLINK-34324][test] Makes all s3 related operations being 
declared and called in a single location
 new 1196e9937db [FLINK-34508][ci] Makes FileSink e2e tests run with Minio 
instead of S3
 new 09f7b070989 [hotfix][tests] Removes warning that's produced when 
removing non-existing files

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test-scripts/common_s3_minio.sh|   4 +-
 flink-end-to-end-tests/test-scripts/common_ssl.sh  |   2 +-
 .../test-scripts/test_file_sink.sh | 118 +++--
 3 files changed, 66 insertions(+), 58 deletions(-)



(flink) 01/04: [hotfix][ci] Docker might return multiple port bindings

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f486e3d97dbb0b6d3c09c6085f6f6bab04977b1
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 11:42:32 2024 +0100

[hotfix][ci] Docker might return multiple port bindings

Adding the grep will work around this issue.
---
 flink-end-to-end-tests/test-scripts/common_s3_minio.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh 
b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
index a56ce4e9410..d3d08392caa 100644
--- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
+++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
@@ -54,7 +54,7 @@ function s3_start {
   while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne 
"true" ]]; do
 sleep 0.1
   done
-  export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed 
s'/0\.0\.0\.0/localhost/')"
+  export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep 
-F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')"
   echo "Started minio @ $S3_ENDPOINT"
   on_exit s3_stop
 }
@@ -115,4 +115,4 @@ function s3_setup_with_provider {
   set_config_key "s3.path-style-access" "true"
 }
 
-source "$(dirname "$0")"/common_s3_operations.sh
\ No newline at end of file
+source "$(dirname "$0")"/common_s3_operations.sh



(flink) 03/04: [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1196e9937db2f8866ef2c0ef896ef8056798337e
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 11:46:45 2024 +0100

[FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3
---
 flink-end-to-end-tests/test-scripts/test_file_sink.sh | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 7b7728b44f8..204c6442b7e 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -62,8 +62,7 @@ function get_total_number_of_valid_lines {
 if [ "${OUT_TYPE}" == "local" ]; then
   echo "[INFO] Test run in local environment: No S3 environment is loaded."
 elif [ "${OUT_TYPE}" == "s3" ]; then
-  # the s3 context requires additional
-  source "$(dirname "$0")"/common_s3.sh
+  source "$(dirname "$0")"/common_s3_minio.sh
   s3_setup hadoop
 
   # overwrites JOB_OUTPUT_PATH to point to S3
@@ -90,7 +89,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then
   function out_cleanup {
 s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
 s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
-rollback_openssl_lib
   }
 
   on_exit out_cleanup
@@ -104,6 +102,9 @@ OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo 
"dynamic"; else echo "static";
 echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
 
 set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback 
at the end of the test run
+on_exit rollback_openssl_lib
+
 set_config_key "metrics.fetcher.update-interval" "2000"
 # this test relies on global failovers
 set_config_key "jobmanager.execution.failover-strategy" "full"



(flink) 04/04: [hotfix][tests] Removes warning that's produced when removing non-existing files

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09f7b070989a906d777a000e6ec3d9b45e192a29
Author: Matthias Pohl 
AuthorDate: Tue Apr 2 19:01:25 2024 +0200

[hotfix][tests] Removes warning that's produced when removing non-existing 
files
---
 flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_ssl.sh 
b/flink-end-to-end-tests/test-scripts/common_ssl.sh
index 2d99e29c4f5..8d4bc50b0a3 100644
--- a/flink-end-to-end-tests/test-scripts/common_ssl.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ssl.sh
@@ -136,5 +136,5 @@ function set_conf_ssl {
 }
 
 function rollback_openssl_lib() {
-  rm $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar
+  rm -f $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar
 }



(flink) 03/04: [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37bcfb867b652a2c970c5e552954100fb2c4dee9
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 11:46:45 2024 +0100

[FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3
---
 flink-end-to-end-tests/test-scripts/test_file_sink.sh | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 7b7728b44f8..204c6442b7e 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -62,8 +62,7 @@ function get_total_number_of_valid_lines {
 if [ "${OUT_TYPE}" == "local" ]; then
   echo "[INFO] Test run in local environment: No S3 environment is loaded."
 elif [ "${OUT_TYPE}" == "s3" ]; then
-  # the s3 context requires additional
-  source "$(dirname "$0")"/common_s3.sh
+  source "$(dirname "$0")"/common_s3_minio.sh
   s3_setup hadoop
 
   # overwrites JOB_OUTPUT_PATH to point to S3
@@ -90,7 +89,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then
   function out_cleanup {
 s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
 s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
-rollback_openssl_lib
   }
 
   on_exit out_cleanup
@@ -104,6 +102,9 @@ OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo 
"dynamic"; else echo "static";
 echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
 
 set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback 
at the end of the test run
+on_exit rollback_openssl_lib
+
 set_config_key "metrics.fetcher.update-interval" "2000"
 # this test relies on global failovers
 set_config_key "jobmanager.execution.failover-strategy" "full"



(flink) 01/04: [hotfix][ci] Docker might return multiple port bindings

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95fb6c1e363c0e9c7f6aebeb66d8ceb7a33cb9d8
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 11:42:32 2024 +0100

[hotfix][ci] Docker might return multiple port bindings

Adding the grep will work around this issue.
---
 flink-end-to-end-tests/test-scripts/common_s3_minio.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh 
b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
index a56ce4e9410..d3d08392caa 100644
--- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
+++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
@@ -54,7 +54,7 @@ function s3_start {
   while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne 
"true" ]]; do
 sleep 0.1
   done
-  export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed 
s'/0\.0\.0\.0/localhost/')"
+  export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep 
-F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')"
   echo "Started minio @ $S3_ENDPOINT"
   on_exit s3_stop
 }
@@ -115,4 +115,4 @@ function s3_setup_with_provider {
   set_config_key "s3.path-style-access" "true"
 }
 
-source "$(dirname "$0")"/common_s3_operations.sh
\ No newline at end of file
+source "$(dirname "$0")"/common_s3_operations.sh



(flink) 04/04: [hotfix][tests] Removes warning that's produced when removing non-existing files

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 500e92c9eba44f8f79eacf2dbc2bc714b8f66399
Author: Matthias Pohl 
AuthorDate: Tue Apr 2 19:01:25 2024 +0200

[hotfix][tests] Removes warning that's produced when removing non-existing 
files
---
 flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_ssl.sh 
b/flink-end-to-end-tests/test-scripts/common_ssl.sh
index 2d99e29c4f5..8d4bc50b0a3 100644
--- a/flink-end-to-end-tests/test-scripts/common_ssl.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ssl.sh
@@ -136,5 +136,5 @@ function set_conf_ssl {
 }
 
 function rollback_openssl_lib() {
-  rm $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar
+  rm -f $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar
 }



(flink) 02/04: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8707c63ee147085671a9ae1b294854bac03fc914
Author: Matthias Pohl 
AuthorDate: Wed Jan 31 15:02:24 2024 +0100

[FLINK-34324][test] Makes all s3 related operations being declared and 
called in a single location
---
 .../test-scripts/test_file_sink.sh | 117 +++--
 1 file changed, 62 insertions(+), 55 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 711f74b6672..7b7728b44f8 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,89 +20,96 @@
 OUT_TYPE="${1:-local}" # other type: s3
 SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
-OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
-S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
 
-if [ "${OUT_TYPE}" == "s3" ]; then
-  source "$(dirname "$0")"/common_s3.sh
-else
-  echo "S3 environment is not loaded for non-s3 test runs (test run type: 
$OUT_TYPE)."
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
-
-s3_setup hadoop
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
-mkdir -p $OUTPUT_PATH
-
-if [ "${OUT_TYPE}" == "local" ]; then
-  echo "Use local output"
-  JOB_OUTPUT_PATH=${OUTPUT_PATH}
-elif [ "${OUT_TYPE}" == "s3" ]; then
-  echo "Use s3 output"
-  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
-  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
-  mkdir -p "$OUTPUT_PATH-chk"
-else
-  echo "Unknown output type: ${OUT_TYPE}"
-  exit 1
-fi
-
-# make sure we delete the file at the end
-function out_cleanup {
-  s3_delete_by_full_path_prefix "$S3_PREFIX"
-  s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
-  rollback_openssl_lib
-}
-if [ "${OUT_TYPE}" == "s3" ]; then
-  on_exit out_cleanup
-fi
+# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download 
folder for remote data
+# the helper functions will access this folder
+RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
+LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
+mkdir -p "${LOCAL_JOB_OUTPUT_PATH}"
 
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+# JOB_OUTPUT_PATH is the location where the job writes its data to
+JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}"
 
 ###
 # Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   OUTPUT_PATH
+#   LOCAL_JOB_OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
-  find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
+  find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + 
| sort -g
 }
 
 ###
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   LOCAL_JOB_OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  # the s3 context requires additional
+  source "$(dirname "$0")"/common_s3.sh
+  s3_setup hadoop

(flink) branch release-1.19 updated (17e7c3eaf14 -> 500e92c9eba)

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


from 17e7c3eaf14 [FLINK-35184][table-runtime] Fix mini-batch join hash 
collision when use InputSideHasNoUniqueKeyBundle (#24749)
 new 95fb6c1e363 [hotfix][ci] Docker might return multiple port bindings
 new 8707c63ee14 [FLINK-34324][test] Makes all s3 related operations being 
declared and called in a single location
 new 37bcfb867b6 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio 
instead of S3
 new 500e92c9eba [hotfix][tests] Removes warning that's produced when 
removing non-existing files

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test-scripts/common_s3_minio.sh|   4 +-
 flink-end-to-end-tests/test-scripts/common_ssl.sh  |   2 +-
 .../test-scripts/test_file_sink.sh | 118 +++--
 3 files changed, 66 insertions(+), 58 deletions(-)



(flink) 02/04: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93526c2f3247598ce80854cf65dd4440eb5aaa43
Author: Matthias Pohl 
AuthorDate: Wed Jan 31 15:02:24 2024 +0100

[FLINK-34324][test] Makes all s3 related operations being declared and 
called in a single location
---
 .../test-scripts/test_file_sink.sh | 117 +++--
 1 file changed, 62 insertions(+), 55 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 711f74b6672..7b7728b44f8 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,89 +20,96 @@
 OUT_TYPE="${1:-local}" # other type: s3
 SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
-OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
-S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
 
-if [ "${OUT_TYPE}" == "s3" ]; then
-  source "$(dirname "$0")"/common_s3.sh
-else
-  echo "S3 environment is not loaded for non-s3 test runs (test run type: 
$OUT_TYPE)."
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
-
-s3_setup hadoop
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
-mkdir -p $OUTPUT_PATH
-
-if [ "${OUT_TYPE}" == "local" ]; then
-  echo "Use local output"
-  JOB_OUTPUT_PATH=${OUTPUT_PATH}
-elif [ "${OUT_TYPE}" == "s3" ]; then
-  echo "Use s3 output"
-  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
-  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
-  mkdir -p "$OUTPUT_PATH-chk"
-else
-  echo "Unknown output type: ${OUT_TYPE}"
-  exit 1
-fi
-
-# make sure we delete the file at the end
-function out_cleanup {
-  s3_delete_by_full_path_prefix "$S3_PREFIX"
-  s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
-  rollback_openssl_lib
-}
-if [ "${OUT_TYPE}" == "s3" ]; then
-  on_exit out_cleanup
-fi
+# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download 
folder for remote data
+# the helper functions will access this folder
+RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
+LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
+mkdir -p "${LOCAL_JOB_OUTPUT_PATH}"
 
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+# JOB_OUTPUT_PATH is the location where the job writes its data to
+JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}"
 
 ###
 # Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   OUTPUT_PATH
+#   LOCAL_JOB_OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   sorted content of part files
 ###
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
-  find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
+  find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + 
| sort -g
 }
 
 ###
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   LOCAL_JOB_OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  # the s3 context requires additional
+  source "$(dirname "$0")"/common_s3.sh
+  s3_setup hadoop

(flink) branch master updated (4fe66e06974 -> 046872cc8dd)

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 4fe66e06974 [FLINK-32084][checkpoint] Migrate current file merging of 
channel state snapshot into the unify file merging framework
 new b2cdb4aac98 [hotfix][ci] Docker might return multiple port bindings
 new 93526c2f324 [FLINK-34324][test] Makes all s3 related operations being 
declared and called in a single location
 new 91e6e7eb913 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio 
instead of S3
 new 046872cc8dd [hotfix][tests] Removes warning that's produced when 
removing non-existing files

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test-scripts/common_s3_minio.sh|   4 +-
 flink-end-to-end-tests/test-scripts/common_ssl.sh  |   2 +-
 .../test-scripts/test_file_sink.sh | 118 +++--
 3 files changed, 66 insertions(+), 58 deletions(-)



(flink) 04/04: [hotfix][tests] Removes warning that's produced when removing non-existing files

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 046872cc8ddf9af14af1f797200065944fca5b64
Author: Matthias Pohl 
AuthorDate: Tue Apr 2 19:01:25 2024 +0200

[hotfix][tests] Removes warning that's produced when removing non-existing 
files
---
 flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_ssl.sh 
b/flink-end-to-end-tests/test-scripts/common_ssl.sh
index 2d99e29c4f5..8d4bc50b0a3 100644
--- a/flink-end-to-end-tests/test-scripts/common_ssl.sh
+++ b/flink-end-to-end-tests/test-scripts/common_ssl.sh
@@ -136,5 +136,5 @@ function set_conf_ssl {
 }
 
 function rollback_openssl_lib() {
-  rm $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar
+  rm -f $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar
 }



(flink) 03/04: [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91e6e7eb913043371b032b00012c5d87b87fa3bc
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 11:46:45 2024 +0100

[FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3
---
 flink-end-to-end-tests/test-scripts/test_file_sink.sh | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 7b7728b44f8..204c6442b7e 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -62,8 +62,7 @@ function get_total_number_of_valid_lines {
 if [ "${OUT_TYPE}" == "local" ]; then
   echo "[INFO] Test run in local environment: No S3 environment is loaded."
 elif [ "${OUT_TYPE}" == "s3" ]; then
-  # the s3 context requires additional
-  source "$(dirname "$0")"/common_s3.sh
+  source "$(dirname "$0")"/common_s3_minio.sh
   s3_setup hadoop
 
   # overwrites JOB_OUTPUT_PATH to point to S3
@@ -90,7 +89,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then
   function out_cleanup {
 s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
 s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
-rollback_openssl_lib
   }
 
   on_exit out_cleanup
@@ -104,6 +102,9 @@ OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo 
"dynamic"; else echo "static";
 echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
 
 set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback 
at the end of the test run
+on_exit rollback_openssl_lib
+
 set_config_key "metrics.fetcher.update-interval" "2000"
 # this test relies on global failovers
 set_config_key "jobmanager.execution.failover-strategy" "full"



(flink) 01/04: [hotfix][ci] Docker might return multiple port bindings

2024-05-10 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2cdb4aac98b1858ba63596e6c6a3bddf3f274da
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 11:42:32 2024 +0100

[hotfix][ci] Docker might return multiple port bindings

Adding the grep will work around this issue.
---
 flink-end-to-end-tests/test-scripts/common_s3_minio.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh 
b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
index a56ce4e9410..d3d08392caa 100644
--- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
+++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh
@@ -54,7 +54,7 @@ function s3_start {
   while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne 
"true" ]]; do
 sleep 0.1
   done
-  export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed 
s'/0\.0\.0\.0/localhost/')"
+  export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep 
-F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')"
   echo "Started minio @ $S3_ENDPOINT"
   on_exit s3_stop
 }
@@ -115,4 +115,4 @@ function s3_setup_with_provider {
   set_config_key "s3.path-style-access" "true"
 }
 
-source "$(dirname "$0")"/common_s3_operations.sh
\ No newline at end of file
+source "$(dirname "$0")"/common_s3_operations.sh



(flink) branch release-1.18 updated: [FLINK-35000][build] Updates link to test code convention in pull request template

2024-04-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new 9150f93b18b [FLINK-35000][build] Updates link to test code convention 
in pull request template
9150f93b18b is described below

commit 9150f93b18b8694646092a6ed24a14e3653f613f
Author: Matthias Pohl 
AuthorDate: Wed Apr 3 14:08:27 2024 +0200

[FLINK-35000][build] Updates link to test code convention in pull request 
template
---
 .github/PULL_REQUEST_TEMPLATE.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 8b0cbf71ea8..2427fd4f01f 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -39,7 +39,7 @@
 
 ## Verifying this change
 
-Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
+Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
 
 *(Please pick either of the following options)*
 



(flink) branch release-1.19 updated: [FLINK-35000][build] Updates link to test code convention in pull request template

2024-04-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new eb58599b434 [FLINK-35000][build] Updates link to test code convention 
in pull request template
eb58599b434 is described below

commit eb58599b434b6c5fe86f6e487ce88315c98b4ec3
Author: Matthias Pohl 
AuthorDate: Wed Apr 3 14:08:27 2024 +0200

[FLINK-35000][build] Updates link to test code convention in pull request 
template
---
 .github/PULL_REQUEST_TEMPLATE.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 8b0cbf71ea8..2427fd4f01f 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -39,7 +39,7 @@
 
 ## Verifying this change
 
-Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
+Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
 
 *(Please pick either of the following options)*
 



(flink) branch master updated: [FLINK-35000][build] Updates link to test code convention in pull request template

2024-04-03 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d301839dfe2 [FLINK-35000][build] Updates link to test code convention 
in pull request template
d301839dfe2 is described below

commit d301839dfe2ed9b1313d23f8307bda76868a0c0a
Author: Matthias Pohl 
AuthorDate: Wed Apr 3 14:08:27 2024 +0200

[FLINK-35000][build] Updates link to test code convention in pull request 
template
---
 .github/PULL_REQUEST_TEMPLATE.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 8b0cbf71ea8..2427fd4f01f 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -39,7 +39,7 @@
 
 ## Verifying this change
 
-Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
+Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
 
 *(Please pick either of the following options)*
 



(flink) branch release-1.19 updated: [FLINK-33816][streaming] Fix unstable test SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain

2024-04-02 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new ece4faee055 [FLINK-33816][streaming] Fix unstable test 
SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
ece4faee055 is described below

commit ece4faee055b3797b39e9c0b55f3e94a3db2f912
Author: Jiabao Sun 
AuthorDate: Tue Jan 2 14:33:17 2024 +0800

[FLINK-33816][streaming] Fix unstable test 
SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
---
 .../org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 29bcf6b5990..5e47d36040b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -708,7 +708,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase 
{
 harness.streamTask.runMailboxLoop();
 harness.finishProcessing();
 
-assertThat(triggerResult.isDone()).isTrue();
 assertThat(triggerResult.get()).isTrue();
 assertThat(checkpointCompleted.isDone()).isTrue();
 }



(flink) branch release-1.19 updated: [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored

2024-03-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new c11656a2406 [FLINK-34933][test] Fixes 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
c11656a2406 is described below

commit c11656a2406f07e2ae7cd6f80c46afb14385ee0e
Author: Matthias Pohl 
AuthorDate: Mon Mar 25 11:33:13 2024 +0100

[FLINK-34933][test] Fixes 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
---
 .../JobMasterServiceLeadershipRunnerTest.java  | 24 +-
 .../jobmaster/TestingJobMasterServiceProcess.java  |  4 +++-
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index a088d3f32c1..ee5cbcecc1a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -499,11 +499,33 @@ class JobMasterServiceLeadershipRunnerTest {
 
 leaderElection.notLeader();
 
+assertThat(jobManagerRunner.getResultFuture())
+.as("The runner result should not be completed by the 
leadership revocation.")
+.isNotDone();
+
 resultFuture.complete(
 JobManagerRunnerResult.forSuccess(
 createFailedExecutionGraphInfo(new 
FlinkException("test exception";
 
-assertThatFuture(jobManagerRunner.getResultFuture()).eventuallyFails();
+assertThat(jobManagerRunner.getResultFuture())
+.as("The runner result should not be completed if the 
leadership is lost.")
+.isNotDone();
+
+jobManagerRunner.closeAsync().get();
+
+assertThatFuture(jobManagerRunner.getResultFuture())
+.eventuallySucceeds()
+.as(
+"The runner result should be completed with a 
SUSPENDED job status if the job didn't finish when closing the runner, yet.")
+.satisfies(
+result -> {
+assertThat(result.isSuccess()).isTrue();
+assertThat(
+result.getExecutionGraphInfo()
+
.getArchivedExecutionGraph()
+.getState())
+.isEqualTo(JobStatus.SUSPENDED);
+});
 }
 
 @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
index c075d88fa09..8932bc8dd22 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -87,7 +88,8 @@ public class TestingJobMasterServiceProcess implements 
JobMasterServiceProcess {
 public static final class Builder {
 
 private UUID leaderSessionId = UUID.randomUUID();
-private Supplier> closeAsyncSupplier = 
unsupportedOperation();
+private Supplier> closeAsyncSupplier =
+FutureUtils::completedVoidFuture;
 private Supplier isInitializedAndRunningSupplier = 
unsupportedOperation();
 private Supplier> 
getJobMasterGatewayFutureSupplier =
 () ->



(flink) branch release-1.18 updated (a6aa569f500 -> 94d1363c27e)

2024-03-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


from a6aa569f500 [FLINK-34897][test] Enables 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 again.
 add 94d1363c27e [FLINK-34933][test] Fixes 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored

No new revisions were added by this update.

Summary of changes:
 .../JobMasterServiceLeadershipRunnerTest.java  | 24 +-
 .../jobmaster/TestingJobMasterServiceProcess.java  |  4 +++-
 2 files changed, 26 insertions(+), 2 deletions(-)



(flink) branch master updated: [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored

2024-03-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1668a072769 [FLINK-34933][test] Fixes 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
1668a072769 is described below

commit 1668a07276929416469392a35a77ba7699aac30b
Author: Matthias Pohl 
AuthorDate: Mon Mar 25 11:33:13 2024 +0100

[FLINK-34933][test] Fixes 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
---
 .../JobMasterServiceLeadershipRunnerTest.java  | 24 +-
 .../jobmaster/TestingJobMasterServiceProcess.java  |  4 +++-
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index a088d3f32c1..ee5cbcecc1a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -499,11 +499,33 @@ class JobMasterServiceLeadershipRunnerTest {
 
 leaderElection.notLeader();
 
+assertThat(jobManagerRunner.getResultFuture())
+.as("The runner result should not be completed by the 
leadership revocation.")
+.isNotDone();
+
 resultFuture.complete(
 JobManagerRunnerResult.forSuccess(
 createFailedExecutionGraphInfo(new 
FlinkException("test exception";
 
-assertThatFuture(jobManagerRunner.getResultFuture()).eventuallyFails();
+assertThat(jobManagerRunner.getResultFuture())
+.as("The runner result should not be completed if the 
leadership is lost.")
+.isNotDone();
+
+jobManagerRunner.closeAsync().get();
+
+assertThatFuture(jobManagerRunner.getResultFuture())
+.eventuallySucceeds()
+.as(
+"The runner result should be completed with a 
SUSPENDED job status if the job didn't finish when closing the runner, yet.")
+.satisfies(
+result -> {
+assertThat(result.isSuccess()).isTrue();
+assertThat(
+result.getExecutionGraphInfo()
+
.getArchivedExecutionGraph()
+.getState())
+.isEqualTo(JobStatus.SUSPENDED);
+});
 }
 
 @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
index c075d88fa09..8932bc8dd22 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -87,7 +88,8 @@ public class TestingJobMasterServiceProcess implements 
JobMasterServiceProcess {
 public static final class Builder {
 
 private UUID leaderSessionId = UUID.randomUUID();
-private Supplier> closeAsyncSupplier = 
unsupportedOperation();
+private Supplier> closeAsyncSupplier =
+FutureUtils::completedVoidFuture;
 private Supplier isInitializedAndRunningSupplier = 
unsupportedOperation();
 private Supplier> 
getJobMasterGatewayFutureSupplier =
 () ->



(flink) branch master updated (f31c128bfc4 -> 83f82ab0c86)

2024-03-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from f31c128bfc4 [FLINK-34956][doc] Fix the config type wrong of Duration
 add 83f82ab0c86 [FLINK-33376][coordination] Extend ZooKeeper Curator 
configurations

No new revisions were added by this update.

Summary of changes:
 .../generated/high_availability_configuration.html | 18 
 .../configuration/HighAvailabilityOptions.java | 49 ++
 .../apache/flink/runtime/util/ZooKeeperUtils.java  | 40 ++
 3 files changed, 107 insertions(+)



(flink-docker) branch dev-1.18 updated (9a5b2d9 -> 6ec55b2)

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch dev-1.18
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


from 9a5b2d9  [FLINK-34165] Update apache download url
 new d93d911  [FLINK-34419][docker] Adds JDK17 support
 new 69d1638  [hotfix][ci] Utilizes matrix strategy in GHA workflow
 new 6ec55b2  [hotfix][ci] Updates actions

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml | 13 ++---
 1 file changed, 10 insertions(+), 3 deletions(-)



(flink-docker) 03/03: [hotfix][ci] Updates actions

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-1.18
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 6ec55b25d815f08953b6cb866ba1a77974b3cfd4
Author: morazow 
AuthorDate: Thu Feb 29 12:34:14 2024 +0100

[hotfix][ci] Updates actions
---
 .github/workflows/ci.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 66d60a2..baec748 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -29,7 +29,7 @@ jobs:
   matrix:
 java_version: [ 8, 11, 17 ]
 steps:
-  - uses: actions/checkout@v3
+  - uses: actions/checkout@v4
   - name: "Build images"
 run: |
   ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n 
"test-java${{ matrix.java_version }}"



(flink-docker) 01/03: [FLINK-34419][docker] Adds JDK17 support

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-1.18
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit d93d911b015e535fc2b6f1426c3b36229ff3d02a
Author: morazow 
AuthorDate: Thu Feb 29 12:32:00 2024 +0100

[FLINK-34419][docker] Adds JDK17 support
---
 .github/workflows/ci.yml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f1bbb24..083b6f2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,5 +26,6 @@ jobs:
 run: |
   ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 8 -n test-java8
   ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 11 -n test-java11
+  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 17 -n test-java17
   - name: "Test images"
 run: testing/run_tests.sh



(flink-docker) 02/03: [hotfix][ci] Utilizes matrix strategy in GHA workflow

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-1.18
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 69d16387bb9eb573a8f2750c1509f7b5e7cdce17
Author: morazow 
AuthorDate: Thu Feb 29 12:33:51 2024 +0100

[hotfix][ci] Utilizes matrix strategy in GHA workflow
---
 .github/workflows/ci.yml | 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 083b6f2..66d60a2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -17,15 +17,21 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  FLINK_TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz";
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  matrix:
+java_version: [ 8, 11, 17 ]
 steps:
   - uses: actions/checkout@v3
   - name: "Build images"
 run: |
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 8 -n test-java8
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 11 -n test-java11
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 17 -n test-java17
+  ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n 
"test-java${{ matrix.java_version }}"
   - name: "Test images"
 run: testing/run_tests.sh



(flink-docker) 03/03: [hotfix][ci] Updates actions

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-1.19
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit e96fc75c468135ca5ba31d7b2ced7796a18b44c1
Author: morazow 
AuthorDate: Thu Feb 29 12:28:15 2024 +0100

[hotfix][ci] Updates actions
---
 .github/workflows/ci.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index d789a8d..0001d45 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -29,7 +29,7 @@ jobs:
   matrix:
 java_version: [ 8, 11, 17, 21 ]
 steps:
-  - uses: actions/checkout@v3
+  - uses: actions/checkout@v4
   - name: "Build images"
 run: |
   ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n 
"test-java${{ matrix.java_version }}"



(flink-docker) 02/03: [hotfix][ci] Utilizes matrix strategy in GHA workflow

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-1.19
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 53909c0b193de539ab96c1798c0ad39b3ed4fc31
Author: morazow 
AuthorDate: Thu Feb 29 12:27:56 2024 +0100

[hotfix][ci] Utilizes matrix strategy in GHA workflow
---
 .github/workflows/ci.yml | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 40dc094..d789a8d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -17,16 +17,21 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  FLINK_TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz";
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  matrix:
+java_version: [ 8, 11, 17, 21 ]
 steps:
   - uses: actions/checkout@v3
   - name: "Build images"
 run: |
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 8 -n test-java8
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 11 -n test-java11
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 17 -n test-java17
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 21 -n test-java21
+  ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n 
"test-java${{ matrix.java_version }}"
   - name: "Test images"
 run: testing/run_tests.sh



(flink-docker) 01/03: [FLINK-34419][docker] Adds JDK17 and JDK21 support

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-1.19
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 67d7c46ed382a665e941f0cf1f1606d10f87dee5
Author: morazow 
AuthorDate: Thu Feb 29 12:25:00 2024 +0100

[FLINK-34419][docker] Adds JDK17 and JDK21 support
---
 .github/workflows/ci.yml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a486206..40dc094 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,5 +26,7 @@ jobs:
 run: |
   ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 8 -n test-java8
   ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 11 -n test-java11
+  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 17 -n test-java17
+  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 21 -n test-java21
   - name: "Test images"
 run: testing/run_tests.sh



(flink-docker) branch dev-1.19 updated (7549f2d -> e96fc75)

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch dev-1.19
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


from 7549f2d  Add GPG key for 1.19.0 release
 new 67d7c46  [FLINK-34419][docker] Adds JDK17 and JDK21 support
 new 53909c0  [hotfix][ci] Utilizes matrix strategy in GHA workflow
 new e96fc75  [hotfix][ci] Updates actions

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml | 13 ++---
 1 file changed, 10 insertions(+), 3 deletions(-)



(flink-docker) 03/03: [hotfix][ci] Updates actions

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit c350c6167033518011abd62b6791fc1b1e8efe2e
Author: morazow 
AuthorDate: Thu Feb 29 12:18:49 2024 +0100

[hotfix][ci] Updates actions
---
 .github/workflows/ci.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ba75faa..3af25de 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -29,7 +29,7 @@ jobs:
   matrix:
 java_version: [ 8, 11, 17, 21 ]
 steps:
-  - uses: actions/checkout@v3
+  - uses: actions/checkout@v4
   - name: "Build images"
 run: |
   ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n 
"test-java${{ matrix.java_version }}"



(flink-docker) branch dev-master updated (dd78da0 -> c350c61)

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch dev-master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


from dd78da0  [hotfix] Improve docker-entrypoint.sh to Use Arrays for 
Configuration Parameters Instead of Eval and String Concatenation
 new 1460077  [FLINK-34419][docker] Adds JDK17 and JDK21 support
 new 4b77c1a  [hotfix][ci] Utilizes matrix strategy in GHA workflow
 new c350c61  [hotfix][ci] Updates actions

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml | 13 ++---
 1 file changed, 10 insertions(+), 3 deletions(-)



(flink-docker) 01/03: [FLINK-34419][docker] Adds JDK17 and JDK21 support

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 1460077743b29e17edd0a2d7efd3897fa097988d
Author: morazow 
AuthorDate: Thu Feb 29 12:08:30 2024 +0100

[FLINK-34419][docker] Adds JDK17 and JDK21 support
---
 .github/workflows/ci.yml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 6e45a73..0c6c6d6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,5 +26,7 @@ jobs:
 run: |
   ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 8 -n test-java8
   ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 11 -n test-java11
+  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 17 -n test-java17
+  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 21 -n test-java21
   - name: "Test images"
 run: testing/run_tests.sh



(flink-docker) 02/03: [hotfix][ci] Utilizes matrix strategy in GHA workflow

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch dev-master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 4b77c1a9bb2ffd362cafb79cc775db41e6cd45c6
Author: morazow 
AuthorDate: Thu Feb 29 12:17:46 2024 +0100

[hotfix][ci] Utilizes matrix strategy in GHA workflow
---
 .github/workflows/ci.yml | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 0c6c6d6..ba75faa 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -17,16 +17,21 @@ name: "CI"
 
 on: [push, pull_request]
 
+env:
+  FLINK_TAR_URL: 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz";
+
 jobs:
   ci:
+name: CI using JDK ${{ matrix.java_version }}
 runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  matrix:
+java_version: [ 8, 11, 17, 21 ]
 steps:
   - uses: actions/checkout@v3
   - name: "Build images"
 run: |
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 8 -n test-java8
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 11 -n test-java11
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 17 -n test-java17
-  ./add-custom.sh -u 
"https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; 
-j 21 -n test-java21
+  ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n 
"test-java${{ matrix.java_version }}"
   - name: "Test images"
 run: testing/run_tests.sh



(flink-docker) 01/02: [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ versions

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 9e0041a2c9dace4bf3f32815e3e24e24385b179b
Author: morazow 
AuthorDate: Thu Feb 29 09:30:14 2024 +0100

[FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ versions
---
 .github/workflows/snapshot.yml | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/snapshot.yml b/.github/workflows/snapshot.yml
index 2389cd1..a262ecc 100644
--- a/.github/workflows/snapshot.yml
+++ b/.github/workflows/snapshot.yml
@@ -36,15 +36,18 @@ jobs:
 strategy:
   max-parallel: 1
   matrix:
-java_version: [8, 11]
 build:
   - flink_version: 1.20-SNAPSHOT
+java_version: [8, 11, 17, 21]
 branch: dev-master
   - flink_version: 1.19-SNAPSHOT
+java_version: [8, 11, 17, 21]
 branch: dev-1.19
   - flink_version: 1.18-SNAPSHOT
+java_version: [8, 11, 17]
 branch: dev-1.18
   - flink_version: 1.17-SNAPSHOT
+java_version: [8, 11]
 branch: dev-1.17
 steps:
   - uses: actions/checkout@v3
@@ -63,7 +66,7 @@ jobs:
   - name: Log in to the Container registry
 uses: docker/login-action@v1
 with:
-  registry: ghcr.io
+  registry: ${{ env.REGISTRY }}
   username: ${{ github.actor }}
   password: ${{ secrets.GITHUB_TOKEN }}
 



(flink-docker) 02/02: [FLINK-34314][docker] Updated GitHub actions versions

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git

commit 54c53458ad99bfb21acca66d5c6e91b5812c26ce
Author: morazow 
AuthorDate: Thu Feb 29 09:32:16 2024 +0100

[FLINK-34314][docker] Updated GitHub actions versions
---
 .github/workflows/snapshot.yml | 13 +
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/snapshot.yml b/.github/workflows/snapshot.yml
index a262ecc..063efbf 100644
--- a/.github/workflows/snapshot.yml
+++ b/.github/workflows/snapshot.yml
@@ -50,21 +50,18 @@ jobs:
 java_version: [8, 11]
 branch: dev-1.17
 steps:
-  - uses: actions/checkout@v3
+  - uses: actions/checkout@v4
 with:
   ref: ${{ matrix.build.branch }}
 
   - name: Set up QEMU
-uses: docker/setup-qemu-action@v1
-with:
-  image: tonistiigi/binfmt:latest
-  platforms: all
+uses: docker/setup-qemu-action@v3
 
   - name: Set up Docker Buildx
-uses: docker/setup-buildx-action@v1
+uses: docker/setup-buildx-action@v3
 
   - name: Log in to the Container registry
-uses: docker/login-action@v1
+uses: docker/login-action@v3
 with:
   registry: ${{ env.REGISTRY }}
   username: ${{ github.actor }}
@@ -82,7 +79,7 @@ jobs:
 run: env
 
   - name: Build and push Docker images (supported platforms)
-uses: docker/bake-action@v1.7.0
+uses: docker/bake-action@v4
 with:
   files: |
 .github/workflows/docker-bake.hcl



(flink-docker) branch master updated (20017e8 -> 54c5345)

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-docker.git


from 20017e8  [FLINK-34701][release] Update docker-entrypoint.sh for 1.19.0
 new 9e0041a  [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 
1.18+ versions
 new 54c5345  [FLINK-34314][docker] Updated GitHub actions versions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/snapshot.yml | 20 ++--
 1 file changed, 10 insertions(+), 10 deletions(-)



(flink) branch release-1.18 updated: [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again.

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new a6aa569f500 [FLINK-34897][test] Enables 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 again.
a6aa569f500 is described below

commit a6aa569f5005041934a2e6398b6749584beeaabd
Author: Matthias Pohl 
AuthorDate: Wed Mar 20 16:06:58 2024 +0100

[FLINK-34897][test] Enables 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 again.
---
 .../JobMasterServiceLeadershipRunnerTest.java  | 27 ++
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index dc78a9e9ed9..3688f6129fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -45,18 +45,19 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
@@ -676,26 +677,25 @@ class JobMasterServiceLeadershipRunnerTest {
 }
 }
 
-@Disabled
 @Test
 void 
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
 throws Exception {
 final AtomicReference 
storedLeaderInformation =
 new AtomicReference<>(LeaderInformationRegister.empty());
+final AtomicBoolean haBackendLeadershipFlag = new AtomicBoolean();
 
 final TestingLeaderElectionDriver.Factory driverFactory =
 new TestingLeaderElectionDriver.Factory(
 TestingLeaderElectionDriver.newBuilder(
-new AtomicBoolean(), storedLeaderInformation, 
new AtomicBoolean()));
+haBackendLeadershipFlag,
+storedLeaderInformation,
+new AtomicBoolean()));
 
 // we need to use DefaultLeaderElectionService here because 
JobMasterServiceLeadershipRunner
 // in connection with the DefaultLeaderElectionService generates the 
nested locking
 final DefaultLeaderElectionService defaultLeaderElectionService =
 new DefaultLeaderElectionService(driverFactory, 
fatalErrorHandler);
 
-final TestingLeaderElectionDriver currentLeaderDriver =
-driverFactory.assertAndGetOnlyCreatedDriver();
-
 // latch to detect when we reached the first synchronized section 
having a lock on the
 // JobMasterServiceProcess#stop side
 final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
@@ -733,6 +733,7 @@ class JobMasterServiceLeadershipRunnerTest {
 // before calling stop 
on the
 // 
DefaultLeaderElectionService
 
triggerClassLoaderLeaseRelease.await();
+
 // In order to 
reproduce the deadlock, we
 // need to ensure that
 // 
leaderContender#grantLeadership can be
@@ -770,13 +771,19 @@ class JobMasterServiceLeadershipRunnerTest {
 jobManagerRunner.start();
 
 // grant leadership to create jobMasterServiceProcess
+haBackendLeadershipFlag.set(true);
 final UUID leaderSessionID = UUID.randomUUID();
 defaultLeaderElectionService.onGrantLeadership(leaderSessionID);
 
-   

(flink) branch release-1.19 updated: [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again.

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new 6b5c48ff53d [FLINK-34897][test] Enables 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 again.
6b5c48ff53d is described below

commit 6b5c48ff53ddc6e75056a9050afded2ac44a413a
Author: Matthias Pohl 
AuthorDate: Wed Mar 20 16:06:58 2024 +0100

[FLINK-34897][test] Enables 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 again.
---
 .../JobMasterServiceLeadershipRunnerTest.java  | 27 ++
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index 5a6f82a75d5..a088d3f32c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -45,18 +45,19 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
@@ -676,26 +677,25 @@ class JobMasterServiceLeadershipRunnerTest {
 }
 }
 
-@Disabled
 @Test
 void 
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
 throws Exception {
 final AtomicReference 
storedLeaderInformation =
 new AtomicReference<>(LeaderInformationRegister.empty());
+final AtomicBoolean haBackendLeadershipFlag = new AtomicBoolean();
 
 final TestingLeaderElectionDriver.Factory driverFactory =
 new TestingLeaderElectionDriver.Factory(
 TestingLeaderElectionDriver.newBuilder(
-new AtomicBoolean(), storedLeaderInformation, 
new AtomicBoolean()));
+haBackendLeadershipFlag,
+storedLeaderInformation,
+new AtomicBoolean()));
 
 // we need to use DefaultLeaderElectionService here because 
JobMasterServiceLeadershipRunner
 // in connection with the DefaultLeaderElectionService generates the 
nested locking
 final DefaultLeaderElectionService defaultLeaderElectionService =
 new DefaultLeaderElectionService(driverFactory, 
fatalErrorHandler);
 
-final TestingLeaderElectionDriver currentLeaderDriver =
-driverFactory.assertAndGetOnlyCreatedDriver();
-
 // latch to detect when we reached the first synchronized section 
having a lock on the
 // JobMasterServiceProcess#stop side
 final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
@@ -733,6 +733,7 @@ class JobMasterServiceLeadershipRunnerTest {
 // before calling stop 
on the
 // 
DefaultLeaderElectionService
 
triggerClassLoaderLeaseRelease.await();
+
 // In order to 
reproduce the deadlock, we
 // need to ensure that
 // 
leaderContender#grantLeadership can be
@@ -770,13 +771,19 @@ class JobMasterServiceLeadershipRunnerTest {
 jobManagerRunner.start();
 
 // grant leadership to create jobMasterServiceProcess
+haBackendLeadershipFlag.set(true);
 final UUID leaderSessionID = UUID.randomUUID();
 defaultLeaderElectionService.onGrantLeadership(leaderSessionID);
 
-   

(flink) branch master updated (d4c1a0a1ba4 -> 0e70d89ad9f)

2024-03-27 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from d4c1a0a1ba4 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase
 add 0e70d89ad9f [FLINK-34897][test] Enables 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 again.

No new revisions were added by this update.

Summary of changes:
 .../JobMasterServiceLeadershipRunnerTest.java  | 27 ++
 1 file changed, 17 insertions(+), 10 deletions(-)



(flink) 04/04: [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2a6ff5a97bf27d68be1188c05158e18df810549
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 16:34:19 2024 +0100

[FLINK-34409][ci] Enable any still disabled e2e tests for the 
AdaptiveScheduler
---
 flink-end-to-end-tests/run-nightly-tests.sh | 46 ++---
 1 file changed, 22 insertions(+), 24 deletions(-)

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index f6d45598088..e6d088133ea 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -125,30 +125,28 @@ function run_group_1 {
 # Docker / Container / Kubernetes tests
 

 
-if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then
-run_test "Wordcount on Docker test (custom fs plugin)" 
"$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh dummy-fs"
-
-run_test "Run Kubernetes test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_embedded_job.sh"
-run_test "Run kubernetes session test (default input)" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh"
-run_test "Run kubernetes session test (custom fs plugin)" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh dummy-fs"
-run_test "Run kubernetes application test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh"
-run_test "Run kubernetes application HA test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_application_ha.sh"
-run_test "Run Kubernetes IT test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_itcases.sh"
-
-run_test "Running Flink over NAT end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions"
-
-if [[ `uname -i` != 'aarch64' ]]; then
-# Skip PyFlink e2e test, because MiniConda and Pyarrow which 
Pyflink depends doesn't support aarch64 currently.
-run_test "Run kubernetes pyflink application test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_pyflink_application.sh"
-
-# Hadoop YARN deosn't support aarch64 at this moment. See: 
https://issues.apache.org/jira/browse/HADOOP-16723
-# These tests are known to fail on JDK11. See FLINK-13719
-if [[ ${PROFILE} != *"jdk11"* ]]; then
-run_test "Running Kerberized YARN per-job on Docker test 
(default input)" "$END_TO_END_DIR/test-scripts/test_yarn_job_kerberos_docker.sh"
-run_test "Running Kerberized YARN per-job on Docker test 
(custom fs plugin)" 
"$END_TO_END_DIR/test-scripts/test_yarn_job_kerberos_docker.sh dummy-fs"
-run_test "Running Kerberized YARN application on Docker test 
(default input)" 
"$END_TO_END_DIR/test-scripts/test_yarn_application_kerberos_docker.sh"
-run_test "Running Kerberized YARN application on Docker test 
(custom fs plugin)" 
"$END_TO_END_DIR/test-scripts/test_yarn_application_kerberos_docker.sh dummy-fs"
-fi
+run_test "Wordcount on Docker test (custom fs plugin)" 
"$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh dummy-fs"
+
+run_test "Run Kubernetes test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_embedded_job.sh"
+run_test "Run kubernetes session test (default input)" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh"
+run_test "Run kubernetes session test (custom fs plugin)" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh dummy-fs"
+run_test "Run kubernetes application test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh"
+run_test "Run kubernetes application HA test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_application_ha.sh"
+run_test "Run Kubernetes IT test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_itcases.sh"
+
+run_test "Running Flink over NAT end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions"
+
+if [[ `uname -i` != 'aarch64' ]]; then
+# Skip PyFlink e2e test, because MiniConda and Pyarrow which Pyflink 
depends doesn't support aarch64 currently.
+run_test "Run kubernetes pyflink application test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_pyflink_application.sh"
+
+# Hadoop YARN doesn't

(flink) branch release-1.18 updated (940b3bbda5b -> f2a6ff5a97b)

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


from 940b3bbda5b [FLINK-32513][core] Add predecessor caching
 new 8f6890fbd75 [FLINK-21400][ci] Enables FileSink and Stateful stream job 
e2e test for the AdaptiveScheduler
 new f5c243097ac [FLINK-21450][test] Enables tests that were disabled for 
the AdaptiveScheduler
 new 836b332b2d1 [FLINK-21535][test] Adds proper comment to test methods 
that are disabled for the AdaptiveScheduler
 new f2a6ff5a97b [FLINK-34409][ci] Enable any still disabled e2e tests for 
the AdaptiveScheduler

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../file/src/FileSourceTextLinesITCase.java|  2 -
 flink-end-to-end-tests/run-nightly-tests.sh| 60 ++
 .../flink/runtime/jobmaster/JobMasterTest.java |  4 +-
 .../checkpointing/UnalignedCheckpointITCase.java   |  3 --
 .../checkpointing/UnalignedCheckpointTestBase.java | 12 -
 .../DefaultSchedulerLocalRecoveryITCase.java   |  9 +++-
 6 files changed, 48 insertions(+), 42 deletions(-)



(flink) 03/04: [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 836b332b2d100e21b1d0008257a009d9ec09e13a
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 15:59:11 2024 +0100

[FLINK-21535][test] Adds proper comment to test methods that are disabled 
for the AdaptiveScheduler
---
 .../flink/test/checkpointing/UnalignedCheckpointITCase.java  |  3 ---
 .../test/checkpointing/UnalignedCheckpointTestBase.java  | 12 ++--
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index c290ba0e39f..1f587dbf767 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -38,12 +38,10 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -104,7 +102,6 @@ import static org.hamcrest.Matchers.equalTo;
  * 
  */
 @RunWith(Parameterized.class)
-@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
 enum Topology implements DagCreator {
 PIPELINE {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index a3da7f7733e..319df472ff8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -102,8 +102,16 @@ import static 
org.apache.flink.shaded.guava31.com.google.common.collect.Iterable
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Base class for tests related to unaligned checkpoints. */
-@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
+/**
+ * Base class for tests related to unaligned checkpoints.
+ *
+ * This test base relies on restarting the subtasks within the scheduler to 
trigger a reset of
+ * the operators. The operator reset is counted in the LongSource. The job 
will terminate if the
+ * number of expected restarts is reached. The AdaptiveScheduler won't trigger 
the operator reset
+ * resulting in the test running forever. This is why this test suite is 
disabled for the {@link
+ * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
+ */
+@Category(FailsWithAdaptiveScheduler.class)
 public abstract class UnalignedCheckpointTestBase extends TestLogger {
 protected static final Logger LOG = 
LoggerFactory.getLogger(UnalignedCheckpointTestBase.class);
 protected static final String NUM_INPUTS = "inputs_";



(flink) 02/04: [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00492630baa5cf041ea2cce2a3560f3e713bf57a
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 15:47:39 2024 +0100

[FLINK-21450][test] Enables tests that were disabled for the 
AdaptiveScheduler

For a few tests, a proper explanation is added why the tests are still 
disabled
---
 .../flink/connector/file/src/FileSourceTextLinesITCase.java  | 2 --
 flink-end-to-end-tests/run-nightly-tests.sh  | 2 +-
 .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java   | 4 +++-
 .../flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java  | 9 +++--
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 08d53f21426..f0ff3fb5cb2 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
@@ -198,7 +197,6 @@ class FileSourceTextLinesITCase {
  * record format (text lines) and restarts TaskManager.
  */
 @Test
-@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // 
FLINK-21450
 void testContinuousTextFileSourceWithTaskManagerFailover(@TempDir 
java.nio.file.Path tmpTestDir)
 throws Exception {
 // This test will kill TM, so we run it in a new cluster to avoid 
affecting other tests
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index b5000eb5d38..72146e38ecd 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -233,7 +233,7 @@ function run_group_2 {
 # Sticky Scheduling
 

 
-if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450
+if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-34416
 run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false false 100" "skip_check_exceptions"
 run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false true 100" "skip_check_exceptions"
 run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
false false 100" "skip_check_exceptions"
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 5a84fb4f0e4..e8e63652283 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1019,7 +1019,9 @@ class JobMasterTest {
  * if this execution fails.
  */
 @Test
-@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // 
FLINK-21450
+// The AdaptiveScheduler doesn't support partial recovery but restarts all 
Executions in case of
+// a local failure.
+@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
 void testRequestNextInputSplitWithLocalFailover() throws Exception {
 
 configuration.set(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
index 44db8abdec6..583e4341e65 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
@@ -60,13 +60,18 @@ public class DefaultSchedulerLocalRecoveryITCase extends 
TestLogger {
 private static final long TIMEOUT = 10_000L;
 
 @Test
-

(flink) 03/04: [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d107966dbe7e38e43680fabf3ffdfeaa71e8d3c
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 15:59:11 2024 +0100

[FLINK-21535][test] Adds proper comment to test methods that are disabled 
for the AdaptiveScheduler
---
 .../flink/test/checkpointing/UnalignedCheckpointITCase.java  |  3 ---
 .../test/checkpointing/UnalignedCheckpointTestBase.java  | 12 ++--
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index effbbc52fc8..f649b8f7df3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -38,12 +38,10 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -105,7 +103,6 @@ import static org.hamcrest.Matchers.equalTo;
  * 
  */
 @RunWith(Parameterized.class)
-@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
 enum Topology implements DagCreator {
 PIPELINE {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index ea664da1772..e88f27fe1ce 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -104,8 +104,16 @@ import static 
org.apache.flink.shaded.guava31.com.google.common.collect.Iterable
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Base class for tests related to unaligned checkpoints. */
-@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689
+/**
+ * Base class for tests related to unaligned checkpoints.
+ *
+ * This test base relies on restarting the subtasks within the scheduler to 
trigger a reset of
+ * the operators. The operator reset is counted in the LongSource. The job 
will terminate if the
+ * number of expected restarts is reached. The AdaptiveScheduler won't trigger 
the operator reset
+ * resulting in the test running forever. This is why this test suite is 
disabled for the {@link
+ * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
+ */
+@Category(FailsWithAdaptiveScheduler.class)
 public abstract class UnalignedCheckpointTestBase extends TestLogger {
 protected static final Logger LOG = 
LoggerFactory.getLogger(UnalignedCheckpointTestBase.class);
 protected static final String NUM_INPUTS = "inputs_";



(flink) branch release-1.19 updated (5ec4bf2f181 -> f82ff7c656d)

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


from 5ec4bf2f181 [FLINK-32513][core] Add predecessor caching
 new 4fc36e9abaa [FLINK-21400][ci] Enables FileSink and Stateful stream job 
e2e test for the AdaptiveScheduler
 new 00492630baa [FLINK-21450][test] Enables tests that were disabled for 
the AdaptiveScheduler
 new 7d107966dbe [FLINK-21535][test] Adds proper comment to test methods 
that are disabled for the AdaptiveScheduler
 new f82ff7c656d [FLINK-34409][ci] Enable any still disabled e2e tests for 
the AdaptiveScheduler

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../file/src/FileSourceTextLinesITCase.java|  2 -
 flink-end-to-end-tests/run-nightly-tests.sh| 60 ++
 .../flink/runtime/jobmaster/JobMasterTest.java |  4 +-
 .../checkpointing/UnalignedCheckpointITCase.java   |  3 --
 .../checkpointing/UnalignedCheckpointTestBase.java | 12 -
 .../DefaultSchedulerLocalRecoveryITCase.java   |  9 +++-
 6 files changed, 48 insertions(+), 42 deletions(-)



(flink) branch master updated (3a55a3ff750 -> 1aa35b95975)

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 3a55a3ff750 [FLINK-34902][table] Fix column mismatch 
IndexOutOfBoundsException exception
 add a1d17ccf0ee [FLINK-21400][ci] Enables FileSink and Stateful stream job 
e2e test for the AdaptiveScheduler
 add 8f06fb472ba [FLINK-21450][test] Enables tests that were disabled for 
the AdaptiveScheduler
 add 96142404c14 [FLINK-21535][test] Adds proper comment to test methods 
that are disabled for the AdaptiveScheduler
 add 1aa35b95975 [FLINK-34409][ci] Enable any still disabled e2e tests for 
the AdaptiveScheduler

No new revisions were added by this update.

Summary of changes:
 .../file/src/FileSourceTextLinesITCase.java|  2 -
 flink-end-to-end-tests/run-nightly-tests.sh| 60 ++
 .../flink/runtime/jobmaster/JobMasterTest.java |  4 +-
 .../checkpointing/UnalignedCheckpointITCase.java   |  3 --
 .../checkpointing/UnalignedCheckpointTestBase.java | 12 -
 .../DefaultSchedulerLocalRecoveryITCase.java   |  9 +++-
 6 files changed, 48 insertions(+), 42 deletions(-)



(flink) 02/04: [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler

2024-03-25 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5c243097ac9fae29c3365a2361b7b0c6be3b3ee
Author: Matthias Pohl 
AuthorDate: Wed Feb 7 15:47:39 2024 +0100

[FLINK-21450][test] Enables tests that were disabled for the 
AdaptiveScheduler

For a few tests, a proper explanation is added why the tests are still 
disabled
---
 .../flink/connector/file/src/FileSourceTextLinesITCase.java  | 2 --
 flink-end-to-end-tests/run-nightly-tests.sh  | 2 +-
 .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java   | 4 +++-
 .../flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java  | 9 +++--
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 01cbd8aa9c2..f1d65888fd8 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
@@ -167,7 +166,6 @@ class FileSourceTextLinesITCase {
  * record format (text lines) and restarts TaskManager.
  */
 @Test
-@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // 
FLINK-21450
 void testContinuousTextFileSourceWithTaskManagerFailover(@TempDir 
java.nio.file.Path tmpTestDir)
 throws Exception {
 // This test will kill TM, so we run it in a new cluster to avoid 
affecting other tests
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 8389442c29f..f6d45598088 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -231,7 +231,7 @@ function run_group_2 {
 # Sticky Scheduling
 

 
-if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450
+if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-34416
 run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false false 100" "skip_check_exceptions"
 run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap 
false true 100" "skip_check_exceptions"
 run_test "Local recovery and sticky scheduling end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks 
false false 100" "skip_check_exceptions"
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 64d462a9090..bc15bcdacbc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1018,7 +1018,9 @@ class JobMasterTest {
  * if this execution fails.
  */
 @Test
-@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // 
FLINK-21450
+// The AdaptiveScheduler doesn't support partial recovery but restarts all 
Executions in case of
+// a local failure.
+@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
 void testRequestNextInputSplitWithLocalFailover() throws Exception {
 
 configuration.setString(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
index 93b55c83081..99a6e3b19fb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
@@ -60,13 +60,18 @@ public class DefaultSchedulerLocalRecoveryITCase extends 
TestLogger {
 private static final long TIMEOUT = 10_000L;
 
 @Test
-

  1   2   3   4   5   6   7   8   9   >