(flink) branch master updated (74864b0b376 -> 502d9673f56)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 74864b0b376 [FLINK-36369][table] Replace all class annotations in the legacy package with Internal in table modules (table-common, table-api-java and table-api-java-bridge) add d2ac6bda28c [FLINK-14068][streaming] Removes non-deprecated API that uses org.apache.flink.streaming.api.windowing.time.Time; add 502d9673f56 [FLINK-14068][core] Fixes docs that was missed in the core module PR of FLINK-14068 No new revisions were added by this update. Summary of changes: README.md | 2 +- .../datastream/event-time/generating_watermarks.md | 6 +- .../docs/dev/datastream/execution/parallel.md | 10 +- .../content.zh/docs/dev/datastream/experimental.md | 4 +- .../docs/dev/datastream/operators/joining.md | 32 ++--- .../docs/dev/datastream/operators/overview.md | 24 ++-- .../docs/dev/datastream/operators/windows.md | 76 +-- docs/content.zh/docs/dev/datastream/overview.md| 8 +- docs/content.zh/docs/learn-flink/event_driven.md | 6 +- .../docs/learn-flink/streaming_analytics.md| 16 +-- docs/content.zh/docs/libs/state_processor_api.md | 6 +- .../docs/ops/state/task_failure_recovery.md| 6 +- .../datastream/event-time/generating_watermarks.md | 6 +- .../docs/dev/datastream/execution/parallel.md | 10 +- docs/content/docs/dev/datastream/experimental.md | 4 +- .../docs/dev/datastream/operators/joining.md | 32 ++--- .../docs/dev/datastream/operators/overview.md | 24 ++-- .../docs/dev/datastream/operators/windows.md | 82 ++-- docs/content/docs/dev/datastream/overview.md | 8 +- docs/content/docs/learn-flink/event_driven.md | 6 +- .../docs/learn-flink/streaming_analytics.md| 14 +-- docs/content/docs/libs/state_processor_api.md | 6 +- .../streaming/tests/PeriodicStreamingJob.java | 4 +- .../tests/DataStreamAllroundTestJobFactory.java| 7 +- .../main/java/org/apache/flink/cep/nfa/NFA.java| 4 +- .../java/org/apache/flink/cep/pattern/Pattern.java | 6 +- .../apache/flink/cep/nfa/TimesOrMoreITCase.java| 3 +- .../flink/cep/nfa/compiler/NFACompilerTest.java| 5 +- .../state/api/SavepointWindowReaderITCase.java | 34 ++--- .../state/api/SavepointWriterWindowITCase.java | 18 +-- .../flink/state/api/input/WindowReaderTest.java| 12 +- .../flink/streaming/api/TimeCharacteristic.java| 7 +- .../api/datastream/AllWindowedStream.java | 16 --- .../streaming/api/datastream/CoGroupedStreams.java | 50 +--- .../flink/streaming/api/datastream/DataStream.java | 59 - .../streaming/api/datastream/JoinedStreams.java| 50 +--- .../streaming/api/datastream/KeyedStream.java | 72 --- .../streaming/api/datastream/WindowedStream.java | 16 --- .../assigners/SlidingEventTimeWindows.java | 40 -- .../assigners/SlidingProcessingTimeWindows.java| 40 -- .../assigners/TumblingEventTimeWindows.java| 53 .../assigners/TumblingProcessingTimeWindows.java | 56 - .../flink/streaming/api/windowing/time/Time.java | 139 - .../operators/windowing/WindowOperatorBuilder.java | 7 -- .../apache/flink/streaming/api/TypeFillTest.java | 9 +- .../api/operators/StateDescriptorPassingTest.java | 14 +-- .../windowing/SlidingEventTimeWindowsTest.java | 35 -- .../SlidingProcessingTimeWindowsTest.java | 35 -- .../windowing/TimeWindowTranslationTest.java | 34 +++-- .../windowing/TumblingEventTimeWindowsTest.java| 27 ++-- .../TumblingProcessingTimeWindowsTest.java | 25 ++-- .../BoundedOutOfOrdernessTimestampExtractor.java | 10 -- .../assigners/EventTimeSessionWindows.java | 14 --- .../assigners/ProcessingTimeSessionWindows.java| 14 --- .../api/windowing/evictors/TimeEvictor.java| 26 .../triggers/ContinuousEventTimeTrigger.java | 13 -- .../triggers/ContinuousProcessingTimeTrigger.java | 13 -- .../apache/flink/streaming/api/DataStreamTest.java | 3 +- ...oundedOutOfOrdernessTimestampExtractorTest.java | 11 +- .../windowing/AllWindowTranslationTest.java| 73 +-- .../windowing/ContinuousEventTimeTriggerTest.java | 15 +-- .../ContinuousProcessingTimeTriggerTest.java | 14 +-- .../windowing/EventTimeSessionWindowsTest.java | 20 +-- .../windowing/EvictingWindowOperatorTest.java | 13 +- .../operators/windowing/MergingWindowSetTest.java | 18 +-- .../ProcessingTimeSessionWindowsTest.java | 20 +-- .../windowing/WindowOperatorMigrationTest.java | 35 +++--- .../operators/windowing/WindowOperatorTest.java| 76 ++- .../operators/windowing
(flink) 02/02: [FLINK-36299][runtime] Makes DeclarativeSlotPool rely on the ComponentMainThreadExecutor that's used in the corresponding test rather than the test's main thread
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 01d62d3a1958b4bdb4e31ff001d6630e348cd9e4 Author: Matthias Pohl AuthorDate: Mon Sep 23 14:37:26 2024 +0200 [FLINK-36299][runtime] Makes DeclarativeSlotPool rely on the ComponentMainThreadExecutor that's used in the corresponding test rather than the test's main thread This issue was introduced by FLINK-36168 where I added proper shutdown logic --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 43 +++--- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 6b3f7acb19e..3a223cb4b8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -158,7 +158,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; -import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph; @@ -412,7 +411,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); final Configuration configuration = new Configuration(); configuration.set( @@ -470,7 +469,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); final Configuration configuration = new Configuration(); configuration.set( @@ -695,7 +694,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); final Configuration configuration = createConfigurationWithNoTimeouts(); configuration.set( @@ -784,7 +783,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); scheduler = new AdaptiveSchedulerBuilder( @@ -805,7 +804,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE); @@ -834,7 +833,7 @@ public class AdaptiveSchedulerTest { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); final Configuration configuration = new Configuration(); configuration.set( @@ -929,7 +928,7 @@ public class AdaptiveSchedulerTest { void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = -createDeclarativeSlotPool(jobGraph.getJobID()); +createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor); final Configuration configuration = new Configuration(); configuration.set( @@ -1074,7 +1073,7 @@ public class AdaptiveSchedulerTest { final JobGrap
(flink) 01/02: [hotfix][test] Changes the way the thread name of the ComponentMainThreadExecutorServiceAdapter is created
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b15a704b4f70590a6cefcf3bc6fcd412941717f9 Author: Matthias Pohl AuthorDate: Mon Sep 23 14:25:33 2024 +0200 [hotfix][test] Changes the way the thread name of the ComponentMainThreadExecutorServiceAdapter is created The old implementation generated thread names having the prefix multiple times. --- .../concurrent/ComponentMainThreadExecutorServiceAdapter.java | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java index 690ccecf227..277e2c04627 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java @@ -78,7 +78,13 @@ public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainT @Nonnull ScheduledExecutorService singleThreadExecutor) { final Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join(); -thread.setName(String.format("ComponentMainThread-%s", thread.getName())); +final String testMainThreadNamePrefix = "ComponentMainThread"; +if (!thread.getName().contains(testMainThreadNamePrefix)) { +// we have to check the current name first because this instance can be reused as a +// ScheduledExecutorService +thread.setName(String.format("%s-%s", testMainThreadNamePrefix, thread.getName())); +} + return new ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread); }
(flink) branch master updated (8bb2f4d7479 -> 01d62d3a195)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 8bb2f4d7479 [FLINK-35411][State] Support process write state requests within coordinator thread new b15a704b4f7 [hotfix][test] Changes the way the thread name of the ComponentMainThreadExecutorServiceAdapter is created new 01d62d3a195 [FLINK-36299][runtime] Makes DeclarativeSlotPool rely on the ComponentMainThreadExecutor that's used in the corresponding test rather than the test's main thread The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../ComponentMainThreadExecutorServiceAdapter.java | 8 +++- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 43 +++--- 2 files changed, 29 insertions(+), 22 deletions(-)
(flink) branch master updated (300e1ea1f2c -> 32dc6c019c0)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 300e1ea1f2c [hotfix] Fix UnifiedSinkMigrationITCase new 93aa8b6389b [FLINK-36295] [runtime] Test AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale() waits until the desired parallelism is reached new 8e7f661ece2 [hotfix] UnsupportedOperationException in AdaptiveSchedulerClusterITCase fix new 32dc6c019c0 [hotfix] Additional logs in AdaptiveScheduler components The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../scheduler/adaptive/DefaultStateTransitionManager.java | 2 ++ .../org/apache/flink/runtime/scheduler/adaptive/State.java| 9 - .../scheduler/adaptive/AdaptiveSchedulerClusterITCase.java| 11 +++ 3 files changed, 21 insertions(+), 1 deletion(-)
(flink) 02/03: [hotfix] UnsupportedOperationException in AdaptiveSchedulerClusterITCase fix
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 8e7f661ece2354197d4a1e2051df5cb626c19469 Author: Zdenek Tison AuthorDate: Thu Sep 19 10:45:58 2024 +0200 [hotfix] UnsupportedOperationException in AdaptiveSchedulerClusterITCase fix --- .../runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java | 5 + 1 file changed, 5 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java index 45da326b6f9..790601b56d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java @@ -258,6 +258,11 @@ public class AdaptiveSchedulerClusterITCase { public Future notifyCheckpointCompleteAsync(long checkpointId) { return CompletableFuture.completedFuture(null); } + +@Override +public Future notifyCheckpointSubsumedAsync(long checkpointId) { +return CompletableFuture.completedFuture(null); +} } private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
(flink) 03/03: [hotfix] Additional logs in AdaptiveScheduler components
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 32dc6c019c036b7fea4532bf429d00bfc9b75c78 Author: Zdenek Tison AuthorDate: Fri Sep 20 15:33:50 2024 +0200 [hotfix] Additional logs in AdaptiveScheduler components --- .../scheduler/adaptive/DefaultStateTransitionManager.java| 2 ++ .../java/org/apache/flink/runtime/scheduler/adaptive/State.java | 9 - 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java index da3ccab27c2..4c7be2ed349 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java @@ -109,11 +109,13 @@ public class DefaultStateTransitionManager implements StateTransitionManager { @Override public void onChange() { +LOG.debug("OnChange event received in phase {}.", getPhase()); phase.onChange(); } @Override public void onTrigger() { +LOG.debug("OnTrigger event received in phase {}.", getPhase()); phase.onTrigger(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java index 2a4bb966042..7d7de39a439 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java @@ -103,7 +103,14 @@ interface State extends LabeledGlobalFailureHandler { Class clazz, ThrowingConsumer action, String debugMessage) throws E { tryRun( clazz, -action, +x -> { +getLogger() +.debug( +"Running '{}' in state {}.", +debugMessage, +this.getClass().getSimpleName()); +ThrowingConsumer.unchecked(action).accept(x); +}, logger -> logger.debug( "Cannot run '{}' because the actual state is {} and not {}.",
(flink) 01/03: [FLINK-36295] [runtime] Test AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale() waits until the desired parallelism is reached
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 93aa8b6389b87d68464126f6c53b62fcfcc7f231 Author: Zdenek Tison AuthorDate: Thu Sep 19 10:32:19 2024 +0200 [FLINK-36295] [runtime] Test AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale() waits until the desired parallelism is reached --- .../runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java | 6 ++ 1 file changed, 6 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java index 2a688f12d6d..45da326b6f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java @@ -187,6 +187,12 @@ public class AdaptiveSchedulerClusterITCase { miniCluster.submitJob(jobGraph).join(); +// wait until the desired parallelism is reached +waitUntilParallelismForVertexReached( +jobGraph.getJobID(), +JOB_VERTEX_ID, +NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS); + // wait until some checkpoints have been completed CommonTestUtils.waitUntilCondition( () ->
(flink) branch master updated: [FLINK-36015] [runtime] Align rescale parameters
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 6ab3c3d9881 [FLINK-36015] [runtime] Align rescale parameters 6ab3c3d9881 is described below commit 6ab3c3d9881b833278e29d205c0bca1acf575be5 Author: Zdenek Tison AuthorDate: Wed Sep 11 16:07:13 2024 +0200 [FLINK-36015] [runtime] Align rescale parameters --- .../generated/all_jobmanager_section.html | 34 +++ .../generated/expert_scheduling_section.html | 34 +++ .../generated/job_manager_configuration.html | 34 +++ .../flink/configuration/JobManagerOptions.java | 102 +++ .../scheduler/adaptive/AdaptiveScheduler.java | 109 +++-- .../scheduler/adaptive/WaitingForResources.java| 21 ++-- .../runtime/minicluster/MiniClusterITCase.java | 6 +- .../adaptive/AdaptiveSchedulerClusterITCase.java | 10 +- .../adaptive/AdaptiveSchedulerSimpleITCase.java| 3 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 82 ++-- .../test/checkpointing/AutoRescalingITCase.java| 4 +- ...tractTaskManagerProcessFailureRecoveryTest.java | 4 +- .../TaskManagerDisconnectOnShutdownITCase.java | 4 +- .../test/scheduling/RescaleOnCheckpointITCase.java | 6 +- .../java/org/apache/flink/yarn/YarnTestBase.java | 3 +- 15 files changed, 284 insertions(+), 172 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index bcb61cd1849..d1df33a72f3 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -8,6 +8,12 @@ + + jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling +30 s +Duration +Determines the minimum time between scaling operations. + jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout 1 min @@ -15,35 +21,29 @@ Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. The scaling operation is performed immediately if the resources have changed and the desired resources are available. The timeout begins as soon as either the available resources or the job's resource requirements are changed.The resource requirements of a running job can be changed using the 2 +Integer +The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint. + + + jobmanager.adaptive-scheduler.rescale-trigger.max-delay (none) Duration -The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled). +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures if checkpointing is enabled). - jobmanager.adaptive-scheduler.resource-stabilization-timeout + jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout 10 s Duration -The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.If scheduler-mode is configured to REACTIVE, this configuration value will defaul [...] +The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available during job submission. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.If scheduler-mode is configured to REACTIVE, this configura [...] - jobmanager.adaptive-scheduler.resource-wait-timeout + jobmanager.adaptive-scheduler.submission.resource-wait-timeout 5 min Duration The maximum time the JobManager will wait to acquir
(flink) branch master updated: [FLINK-36016] [runtime] Synchronize initialization time and clock usage in DefaultStateTransitionManager
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 78c19a1278e [FLINK-36016] [runtime] Synchronize initialization time and clock usage in DefaultStateTransitionManager 78c19a1278e is described below commit 78c19a1278edf54f679b196ec5ef132dfd4b63e4 Author: Zdenek Tison AuthorDate: Tue Sep 10 13:25:14 2024 +0200 [FLINK-36016] [runtime] Synchronize initialization time and clock usage in DefaultStateTransitionManager --- .../scheduler/adaptive/AdaptiveScheduler.java | 19 +- .../adaptive/DefaultStateTransitionManager.java| 27 +++--- .../runtime/scheduler/adaptive/Executing.java | 17 - .../scheduler/adaptive/AdaptiveSchedulerTest.java | 9 ++--- .../DefaultStateTransitionManagerTest.java | 5 ++- .../runtime/scheduler/adaptive/ExecutingTest.java | 41 +- 6 files changed, 46 insertions(+), 72 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 60c605e559f..d993d1d04aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -151,6 +151,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import static org.apache.flink.configuration.JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER; import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking; @@ -192,16 +193,16 @@ public class AdaptiveScheduler * * @see * DefaultStateTransitionManager#DefaultStateTransitionManager(StateTransitionManager.Context, - * Duration, Duration, Duration, Temporal) + * Supplier, Duration, Duration, Duration) */ @FunctionalInterface interface StateTransitionManagerFactory { StateTransitionManager create( StateTransitionManager.Context context, +Supplier clock, Duration cooldownTimeout, Duration resourceStabilizationTimeout, -Duration maximumDelayForTrigger, -Temporal lastStateTransition); +Duration maximumDelayForTrigger); } /** @@ -411,6 +412,8 @@ public class AdaptiveScheduler private final JobFailureMetricReporter jobFailureMetricReporter; private final boolean reportEventsAsSpans; +private final Supplier clock = Instant::now; + public AdaptiveScheduler( Settings settings, JobGraph jobGraph, @@ -1155,10 +1158,10 @@ public class AdaptiveScheduler StateTransitionManager.Context ctx) { return stateTransitionManagerFactory.create( ctx, +clock, Duration.ZERO, // skip cooldown phase settings.getResourceStabilizationTimeout(), -Duration.ZERO, // trigger immediately once the stabilization phase is over -Instant.now()); +Duration.ZERO); // trigger immediately once the stabilization phase is over } private void declareDesiredResources() { @@ -1194,13 +1197,13 @@ public class AdaptiveScheduler } private StateTransitionManager createExecutingStateTransitionManager( -StateTransitionManager.Context ctx, Instant lastRescaleTimestamp) { +StateTransitionManager.Context ctx) { return stateTransitionManagerFactory.create( ctx, +clock, settings.getScalingIntervalMin(), settings.getScalingResourceStabilizationTimeout(), -settings.getMaximumDelayForTriggeringRescale(), -lastRescaleTimestamp); +settings.getMaximumDelayForTriggeringRescale()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java index 27bfcfd183e..da3ccab27c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java @@ -28,7 +28,6 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.time.Duration; -import java.time.Instant; import
(flink) branch master updated (0d3be65c06b -> 15361471676)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 0d3be65c06b [hotfix][test] Adds back timestamp to log output add 15361471676 [FLINK-14068][core] Removes deprecated org.apache.flink.api.common.time.Time (#25250) No new revisions were added by this update. Summary of changes: .../docs/dev/datastream/fault-tolerance/state.md | 16 +- .../docs/ops/state/task_failure_recovery.md| 14 +- .../docs/dev/datastream/fault-tolerance/state.md | 18 +-- .../docs/ops/state/task_failure_recovery.md| 14 +- .../ApplicationDispatcherBootstrap.java| 8 +- .../deployment/application/EmbeddedJobClient.java | 8 +- .../application/JobStatusPollingUtils.java | 8 +- .../application/executors/EmbeddedExecutor.java| 7 +- .../executors/EmbeddedExecutorFactory.java | 6 +- .../client/program/rest/RestClusterClient.java | 3 +- .../connector/base/sink/AsyncSinkBaseITCase.java | 5 +- .../file/sink/BatchExecutionFileSinkITCase.java| 5 +- .../sink/StreamingExecutionFileSinkITCase.java | 4 +- .../file/sink/writer/FileWriterBucketTest.java | 6 +- .../wikiedits/WikipediaEditsSourceTest.java| 8 +- .../common/restartstrategy/RestartStrategies.java | 134 +--- .../flink/api/common/state/StateTtlConfig.java | 33 .../org/apache/flink/api/common/time/Time.java | 172 - .../flink/configuration/ConfigurationUtils.java| 14 +- .../main/java/org/apache/flink/util/TimeUtils.java | 38 + .../flink/api/common/ExecutionConfigTest.java | 4 +- .../eventtime/WatermarksWithIdlenessTest.java | 2 +- .../api/common/state/StateDescriptorTest.java | 4 +- .../flink/api/common/state/StateTtlConfigTest.java | 10 +- .../org/apache/flink/testutils/TestingUtils.java | 7 +- .../flink/util/TimeUtilsPrettyPrintingTest.java| 3 +- .../java/org/apache/flink/util/TimeUtilsTest.java | 11 -- .../flink/connector/file/sink/FileSinkProgram.java | 6 +- .../flink/sql/tests/StreamSQLTestProgram.java | 6 +- .../flink/streaming/tests/TtlTestConfig.java | 11 +- .../tests/verify/AbstractTtlStateVerifier.java | 2 +- ...HighAvailabilityRecoverFromSavepointITCase.java | 2 +- flink-python/pyflink/common/restart_strategy.py| 16 +- .../datastream/connectors/tests/test_kafka.py | 2 + .../pyflink/fn_execution/embedded/java_utils.py| 4 +- flink-python/pyflink/table/table_config.py | 6 +- flink-python/pyflink/util/java_utils.py| 16 +- .../org/apache/flink/python/util/ProtoUtils.java | 4 +- .../flink/streaming/api/utils/ProtoUtilsTest.java | 6 +- .../flink/runtime/rpc/pekko/PekkoRpcActorTest.java | 9 +- .../runtime/rpc/pekko/TimeoutCallStackTest.java| 11 +- .../apache/flink/runtime/rpc/RpcGatewayUtils.java | 8 +- .../org/apache/flink/runtime/rpc/RpcUtils.java | 3 +- .../runtime/webmonitor/WebSubmissionExtension.java | 6 +- .../webmonitor/handlers/JarDeleteHandler.java | 4 +- .../webmonitor/handlers/JarListHandler.java| 4 +- .../webmonitor/handlers/JarPlanHandler.java| 6 +- .../runtime/webmonitor/handlers/JarRunHandler.java | 4 +- .../webmonitor/handlers/JarUploadHandler.java | 4 +- .../webmonitor/LeaderRetrievalHandlerTest.java | 17 +- .../runtime/webmonitor/WebMonitorUtilsTest.java| 4 +- .../webmonitor/WebSubmissionExtensionTest.java | 5 +- .../webmonitor/handlers/JarDeleteHandlerTest.java | 4 +- .../handlers/JarHandlerParameterTest.java | 6 +- .../runtime/webmonitor/handlers/JarHandlers.java | 4 +- .../handlers/JarRunHandlerParameterTest.java | 3 +- .../webmonitor/handlers/JarUploadHandlerTest.java | 4 +- .../flink/runtime/dispatcher/Dispatcher.java | 54 +++ .../DispatcherCachedOperationsHandler.java | 10 +- .../runtime/dispatcher/DispatcherGateway.java | 16 +- .../runtime/dispatcher/DispatcherRestEndpoint.java | 6 +- .../dispatcher/FileExecutionGraphInfoStore.java| 12 +- .../dispatcher/MemoryExecutionGraphInfoStore.java | 12 +- .../flink/runtime/dispatcher/MiniDispatcher.java | 8 +- .../dispatcher/TriggerCheckpointFunction.java | 4 +- .../dispatcher/TriggerSavepointFunction.java | 4 +- .../cleanup/CheckpointResourcesCleanupRunner.java | 10 +- .../runtime/entrypoint/ClusterEntrypoint.java | 8 +- .../entrypoint/SessionClusterEntrypoint.java | 6 +- .../executiongraph/DefaultExecutionGraph.java | 6 +- .../DefaultExecutionGraphBuilder.java | 4 +- .../flink/runtime/executiongraph/Execution.java| 6 +- .../runtime/executiongraph/ExecutionJobVertex.java | 6 +- .../runtime/executiongraph/ExecutionVertex.java| 6
(flink) branch master updated (ed7f57c9ccb -> 0d3be65c06b)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from ed7f57c9ccb [FLINK-36258][config] set `HADOOP_CONF_DIR` to config hadoop in YarnFileStageTestS3ITCase new 0731f4f1e12 [FLINK-36279][runtime] Making desired resources being calculated based on all available slots for the job new 621ce3248a9 [hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase new 0d3be65c06b [hotfix][test] Adds back timestamp to log output The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../scheduler/adaptive/AdaptiveScheduler.java | 4 +-- .../test/scheduling/RescaleOnCheckpointITCase.java | 38 +++--- .../src/test/resources/log4j2-test.properties | 2 +- 3 files changed, 36 insertions(+), 8 deletions(-)
(flink) 02/03: [hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 621ce3248a927eab2886dd0c3e557fffc878be05 Author: Matthias Pohl AuthorDate: Mon Sep 16 12:10:09 2024 +0200 [hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase --- .../test/scheduling/RescaleOnCheckpointITCase.java | 38 +++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java index 60c2b19cca1..4febfe453cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -40,6 +41,8 @@ import org.apache.flink.util.TestLoggerExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Iterator; @@ -51,6 +54,8 @@ import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(TestLoggerExtension.class) class RescaleOnCheckpointITCase { +private static final Logger LOG = LoggerFactory.getLogger(RescaleOnCheckpointITCase.class); + // Scaling down is used here because scaling up is not supported by the NumberSequenceSource // that's used in this test. private static final int NUMBER_OF_SLOTS = 4; @@ -111,34 +116,59 @@ class RescaleOnCheckpointITCase { assertThat(jobVertexIterator.hasNext()) .as("There needs to be at least one JobVertex.") .isTrue(); +final JobVertexID jobVertexId = jobVertexIterator.next().getID(); final JobResourceRequirements jobResourceRequirements = JobResourceRequirements.newBuilder() -.setParallelismForJobVertex( -jobVertexIterator.next().getID(), 1, AFTER_RESCALE_PARALLELISM) +.setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM) .build(); assertThat(jobVertexIterator.hasNext()) .as("This test expects to have only one JobVertex.") .isFalse(); restClusterClient.submitJob(jobGraph).join(); + +final JobID jobId = jobGraph.getJobID(); try { -final JobID jobId = jobGraph.getJobID(); +LOG.info( +"Waiting for job {} to reach parallelism of {} for vertex {}.", +jobId, +BEFORE_RESCALE_PARALLELISM, +jobVertexId); waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM); +LOG.info( +"Job {} reached parallelism of {} for vertex {}. Updating the vertex parallelism next to {}.", +jobId, +BEFORE_RESCALE_PARALLELISM, +jobVertexId, +AFTER_RESCALE_PARALLELISM); restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join(); // timeout to allow any unexpected rescaling to happen anyway Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis()); // verify that the previous timeout didn't result in a change of parallelism +LOG.info( +"Checking that job {} hasn't changed its parallelism even after some delay, yet.", +jobId); waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM); miniCluster.triggerCheckpoint(jobId); +LOG.info( +"Waiting for job {} to reach parallelism of {} for vertex {}.", +jobId, +AFTER_RESCALE_PARALLELISM, +jobVertexId); waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM); -waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM); +final int expectedFreeSlot
(flink) 01/03: [FLINK-36279][runtime] Making desired resources being calculated based on all available slots for the job
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0731f4f1e122fb4e675464380300e7ffdd595d46 Author: Matthias Pohl AuthorDate: Mon Sep 16 12:09:35 2024 +0200 [FLINK-36279][runtime] Making desired resources being calculated based on all available slots for the job FLINK-36014 aligned the triggering of the CreateExecutionGraph state within WaitingForResources and Executing state. Before that change, only WaitingForResources relied on this method. Relying on free slots was good enough because in WaitingForResources state, there are no slots allocated, yet. Using this method for Executing state now as well changes this premise because there are slots allocated while checking the slot availability that would become available after the restart. Hence, considering these slots in the slot availability check is good enough. This will not break the premise for the WaitingForResources state. --- .../apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java| 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 5686c0e47ed..60c605e559f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1078,9 +1078,7 @@ public class AdaptiveScheduler @Override public boolean hasDesiredResources() { -final Collection freeSlots = - declarativeSlotPool.getFreeSlotTracker().getFreeSlotsInformation(); -return hasDesiredResources(desiredResources, freeSlots); +return hasDesiredResources(desiredResources, declarativeSlotPool.getAllSlotsInformation()); } @VisibleForTesting
(flink) 03/03: [hotfix][test] Adds back timestamp to log output
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0d3be65c06b7e8afa90e00628bc1c325953c597f Author: Matthias Pohl AuthorDate: Mon Sep 16 12:45:32 2024 +0200 [hotfix][test] Adds back timestamp to log output This was accidentally removed with the changes of FLINK-34417 --- flink-tests/src/test/resources/log4j2-test.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-tests/src/test/resources/log4j2-test.properties b/flink-tests/src/test/resources/log4j2-test.properties index 843e105b0ea..4dcd8178762 100644 --- a/flink-tests/src/test/resources/log4j2-test.properties +++ b/flink-tests/src/test/resources/log4j2-test.properties @@ -28,7 +28,7 @@ appender.testlogger.name = TestLogger appender.testlogger.type = CONSOLE appender.testlogger.target = SYSTEM_ERR appender.testlogger.layout.type = PatternLayout -appender.testlogger.layout.pattern = [%-32X{flink-job-id}] %c{0} [%t] %-5p %m%n +appender.testlogger.layout.pattern = %-4r [%-32X{flink-job-id}] %c{0} [%t] %-5p %m%n logger.migration.name = org.apache.flink.test.migration logger.migration.level = INFO
(flink) branch master updated (8dc40ab70c9 -> 6a5fa9962a9)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 8dc40ab70c9 [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248) new b79cf32cebe [hotfix] Fixes JobExceptionsInfoWithHistory equals method new 6a5fa9962a9 [FLINK-36147][runtime] Removes deprecated location field (was deprecated in 1.19) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../src/test/resources/rest_api_v1.snapshot| 6 .../rest/handler/job/JobExceptionsHandler.java | 3 -- .../messages/JobExceptionsInfoWithHistory.java | 41 ++ .../rest/handler/job/JobExceptionsHandlerTest.java | 14 .../JobExceptionsInfoWithHistoryNoRootTest.java| 2 -- 5 files changed, 10 insertions(+), 56 deletions(-)
(flink) 01/02: [hotfix] Fixes JobExceptionsInfoWithHistory equals method
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b79cf32cebeb5044dcc339cc52a47b40a5498fd6 Author: Matthias Pohl AuthorDate: Fri Aug 23 22:27:45 2024 +0200 [hotfix] Fixes JobExceptionsInfoWithHistory equals method --- .../flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java index fbf17723f6e..1ca124cf01d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java @@ -287,7 +287,7 @@ public class JobExceptionsInfoWithHistory implements ResponseBody { && Objects.equals(failureLabels, that.failureLabels) && Objects.equals(taskName, that.taskName) && Objects.equals(location, that.location) -&& Objects.equals(location, that.endpoint); +&& Objects.equals(endpoint, that.endpoint); } @Override
(flink) 02/02: [FLINK-36147][runtime] Removes deprecated location field (was deprecated in 1.19)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 6a5fa9962a90b663209a9eb2e910290b55eb97e1 Author: Matthias Pohl AuthorDate: Fri Aug 23 22:32:26 2024 +0200 [FLINK-36147][runtime] Removes deprecated location field (was deprecated in 1.19) --- .../src/test/resources/rest_api_v1.snapshot| 6 .../rest/handler/job/JobExceptionsHandler.java | 3 -- .../messages/JobExceptionsInfoWithHistory.java | 39 ++ .../rest/handler/job/JobExceptionsHandlerTest.java | 14 .../JobExceptionsInfoWithHistoryNoRootTest.java| 2 -- 5 files changed, 9 insertions(+), 55 deletions(-) diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 995662900eb..9e9fe94da6a 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1989,9 +1989,6 @@ "taskName" : { "type" : "string" }, - "location" : { -"type" : "string" - }, "endpoint" : { "type" : "string" }, @@ -2022,9 +2019,6 @@ "taskName" : { "type" : "string" }, -"location" : { - "type" : "string" -}, "endpoint" : { "type" : "string" }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index c8580fa753b..ac00137c630 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -195,7 +195,6 @@ public class JobExceptionsHandler historyEntry.getFailureLabels(), historyEntry.getFailingTaskName(), toString(historyEntry.getTaskManagerLocation()), -toString(historyEntry.getTaskManagerLocation()), toTaskManagerId(historyEntry.getTaskManagerLocation()), concurrentExceptions); } @@ -211,7 +210,6 @@ public class JobExceptionsHandler exceptionHistoryEntry.getFailureLabels(), null, null, -null, null); } @@ -224,7 +222,6 @@ public class JobExceptionsHandler exceptionHistoryEntry.getFailureLabels(), exceptionHistoryEntry.getFailingTaskName(), toString(exceptionHistoryEntry.getTaskManagerLocation()), -toString(exceptionHistoryEntry.getTaskManagerLocation()), toTaskManagerId(exceptionHistoryEntry.getTaskManagerLocation())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java index 1ca124cf01d..f1253592e1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java @@ -154,7 +154,6 @@ public class JobExceptionsInfoWithHistory implements ResponseBody { public static final String FIELD_NAME_EXCEPTION_STACKTRACE = "stacktrace"; public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = "timestamp"; public static final String FIELD_NAME_TASK_NAME = "taskName"; -@Deprecated public static final String FIELD_NAME_LOCATION = "location"; public static final String FIELD_NAME_ENDPOINT = "endpoint"; public static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId"; public static final String FIELD_NAME_FAILURE_LABELS = "failureLabels"; @@ -173,13 +172,6 @@ public class JobExceptionsInfoWithHistory implements ResponseBody { @Nullable private final String taskName; -/** @deprecated Use {@link ExceptionInfo#endpoint} instead. */ -@Deprecated -@JsonInclude(NON_NULL) -@JsonProperty(FIELD_NAME_LOCATION) -@Nullable -private final String location; -
(flink) branch master updated: [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8dc40ab70c9 [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248) 8dc40ab70c9 is described below commit 8dc40ab70c991d67949d3498b26dd6661e816312 Author: Matthias Pohl AuthorDate: Thu Sep 12 19:24:02 2024 +0200 [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. (#25248) --- .../src/test/resources/rest_api_v1.snapshot| 36 .../rest/handler/job/JobExceptionsHandler.java | 47 - .../runtime/rest/messages/JobExceptionsInfo.java | 230 - .../messages/JobExceptionsInfoWithHistory.java | 32 +-- .../rest/handler/job/JobExceptionsHandlerTest.java | 72 --- .../JobExceptionsInfoWithHistoryNoRootTest.java| 22 -- .../messages/JobExceptionsInfoWithHistoryTest.java | 22 -- 7 files changed, 40 insertions(+), 421 deletions(-) diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 6fcee2ff9c1..995662900eb 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1961,42 +1961,6 @@ "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory", "properties" : { -"root-exception" : { - "type" : "string" -}, -"timestamp" : { - "type" : "integer" -}, -"all-exceptions" : { - "type" : "array", - "items" : { -"type" : "object", -"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo:ExecutionExceptionInfo", -"properties" : { - "exception" : { -"type" : "string" - }, - "task" : { -"type" : "string" - }, - "location" : { -"type" : "string" - }, - "endpoint" : { -"type" : "string" - }, - "timestamp" : { -"type" : "integer" - }, - "taskManagerId" : { -"type" : "string" - } -} - } -}, -"truncated" : { - "type" : "boolean" -}, "exceptionHistory" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 6d5f49d55b4..c8580fa753b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -20,15 +20,9 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -56,7 +50,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Executor; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -130,47 +123,7 @@ public class JobExceptionsHandler Executio
(flink) branch master updated: [FLINK-36207][build] Disables deprecated API from japicmp
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 625c6774fc1 [FLINK-36207][build] Disables deprecated API from japicmp 625c6774fc1 is described below commit 625c6774fc187aed9f875abb421dd82d6cd10548 Author: Matthias Pohl AuthorDate: Tue Sep 3 21:33:02 2024 +0200 [FLINK-36207][build] Disables deprecated API from japicmp The japicmp plugin doesn't seem to work well with Scala annotations (more specifically @scala.deprecated). But we're planning to remove the Scala code as part of the 2.0 release, anyway. Hence, excluding all *scala files should be good enough. --- pom.xml | 4 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index 777fea41f31..fb56ce20f60 100644 --- a/pom.xml +++ b/pom.xml @@ -2332,6 +2332,8 @@ under the License. + @java.lang.Deprecated + *.scala @org.apache.flink.annotation.Experimental @org.apache.flink.annotation.PublicEvolving @org.apache.flink.annotation.Internal @@ -2341,6 +2343,8 @@ under the License. org.apache.flink.configuration.Configuration#setBytes(java.lang.String,byte[]) org.apache.flink.configuration.ConfigConstants + + org.apache.flink.api.common.eventtime.WatermarksWithIdleness org.apache.flink.api.java.tuple.* org.apache.flink.types.NullFieldException
(flink) branch master updated: [FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 83be264569d [FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305) 83be264569d is described below commit 83be264569d3d8c66dc7b82c062e65f34e35d119 Author: Matthias Pohl AuthorDate: Thu Sep 12 17:44:32 2024 +0200 [FLINK-36168][runtime] Replaces goToFinished with the cancel method (#25305) We have to transition to all the expected state transitions properly to handle all the cleanup. This also requires proper handling of the state transitions by the DummyState test implementations. --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 89 -- .../adaptive/WaitingForResourcesTest.java | 15 ++-- 2 files changed, 57 insertions(+), 47 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 49a58b286ac..c09fbfadcb1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -143,6 +143,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -206,24 +207,40 @@ public class AdaptiveSchedulerTest { } private static void closeInExecutorService( -@Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor executor) { +@Nullable AdaptiveScheduler scheduler, Executor executor) { if (scheduler != null) { final CompletableFuture closeFuture = new CompletableFuture<>(); executor.execute( () -> { try { -// no matter what state the scheduler is in; we have to go to Finished -// state to please the Preconditions of the close call -if (scheduler.getState().getClass() != Finished.class) { -scheduler.goToFinished( -scheduler.getArchivedExecutionGraph( -JobStatus.CANCELED, null)); -} +scheduler.cancel(); + FutureUtils.forward(scheduler.closeAsync(), closeFuture); } catch (Throwable t) { closeFuture.completeExceptionally(t); } }); + +// we have to wait for the job termination outside the main thread because the +// cancellation tasks are scheduled on the main thread as well. +scheduler +.getJobTerminationFuture() +.whenCompleteAsync( +(jobStatus, error) -> { +assertThat(scheduler.getState().getClass()) +.isEqualTo(Finished.class); + +if (error != null) { +closeFuture.completeExceptionally(error); +} else { +try { + FutureUtils.forward(scheduler.closeAsync(), closeFuture); +} catch (Throwable t) { +closeFuture.completeExceptionally(t); +} +} +}, +executor); assertThatFuture(closeFuture).eventuallySucceeds(); } } @@ -310,7 +327,7 @@ public class AdaptiveSchedulerTest { final State state = scheduler.getState(); assertThat(scheduler.isState(state)).isTrue(); -assertThat(scheduler.isState(new DummyState())).isFalse(); +assertThat(scheduler.isState(new DummyState(scheduler))).isFalse(); } @Test @@ -337,7 +354,7 @@ public class AdaptiveSchedulerTest { .build(); AtomicBoolean ran = new AtomicBoolean(false); -scheduler.runIfState(new DummyState(), () -> ran.set(true)); +scheduler.runIfState(new DummyState(scheduler), () -> ran.set(true)); assertThat(ran.get()).isFalse();
(flink) branch master updated (740a5674fd9 -> 77da0414929)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 740a5674fd9 [FLINK-34168][config] Refactor callers that use deprecated get/setXXX (#25301) new c43e7915ffe [FLINK-36013] [runtime] Introduce the transition from Restarting to CreatingExecutionGraph state new 77da0414929 [hotfix] Added ExecutionGraph validation while transiting to WaitingForResources state in RestartingTest The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../scheduler/adaptive/AdaptiveScheduler.java | 2 + .../runtime/scheduler/adaptive/Executing.java | 1 + .../scheduler/adaptive/FailureResultUtil.java | 1 + .../runtime/scheduler/adaptive/Restarting.java | 32 +++--- .../scheduler/adaptive/StateTransitions.java | 4 ++ .../runtime/scheduler/adaptive/ExecutingTest.java | 36 +-- .../runtime/scheduler/adaptive/RestartingTest.java | 70 +- .../scheduler/adaptive/StopWithSavepointTest.java | 10 +++- 8 files changed, 124 insertions(+), 32 deletions(-)
(flink) 02/02: [hotfix] Added ExecutionGraph validation while transiting to WaitingForResources state in RestartingTest
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 77da0414929437687d7c19df18200a2528b6c27c Author: Zdenek Tison AuthorDate: Fri Sep 6 08:26:38 2024 +0200 [hotfix] Added ExecutionGraph validation while transiting to WaitingForResources state in RestartingTest --- .../apache/flink/runtime/scheduler/adaptive/RestartingTest.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index 77d9c8a2373..346f09b00eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -191,7 +191,7 @@ class RestartingTest { private final StateValidator cancellingStateValidator = new StateValidator<>("Cancelling"); -private final StateValidator waitingForResourcesStateValidator = +private final StateValidator waitingForResourcesStateValidator = new StateValidator<>("WaitingForResources"); private final StateValidator creatingExecutionGraphStateValidator = @@ -202,7 +202,7 @@ class RestartingTest { } public void setExpectWaitingForResources() { -waitingForResourcesStateValidator.expectInput((none) -> {}); +waitingForResourcesStateValidator.expectInput(assertNonNull()); } public void setExpectCreatingExecutionGraph() { @@ -225,8 +225,8 @@ class RestartingTest { public void archiveFailure(RootExceptionHistoryEntry failure) {} @Override -public void goToWaitingForResources(ExecutionGraph previousExecutionGraph) { -waitingForResourcesStateValidator.validateInput(null); +public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) { + waitingForResourcesStateValidator.validateInput(previousExecutionGraph); hadStateTransition = true; }
(flink) 01/02: [FLINK-36013] [runtime] Introduce the transition from Restarting to CreatingExecutionGraph state
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c43e7915ffe2932a266e6aa2294724bb39a603d1 Author: Zdenek Tison AuthorDate: Mon Aug 12 13:43:53 2024 +0200 [FLINK-36013] [runtime] Introduce the transition from Restarting to CreatingExecutionGraph state --- .../scheduler/adaptive/AdaptiveScheduler.java | 2 + .../runtime/scheduler/adaptive/Executing.java | 1 + .../scheduler/adaptive/FailureResultUtil.java | 1 + .../runtime/scheduler/adaptive/Restarting.java | 32 +++ .../scheduler/adaptive/StateTransitions.java | 4 ++ .../runtime/scheduler/adaptive/ExecutingTest.java | 36 +++-- .../runtime/scheduler/adaptive/RestartingTest.java | 62 +- .../scheduler/adaptive/StopWithSavepointTest.java | 10 +++- 8 files changed, 120 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 7ef897936d2..5686c0e47ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1229,6 +1229,7 @@ public class AdaptiveScheduler ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, +boolean forcedRestart, List failureCollection) { for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { @@ -1249,6 +1250,7 @@ public class AdaptiveScheduler operatorCoordinatorHandler, LOG, backoffTime, +forcedRestart, userCodeClassLoader, failureCollection)); numRestarts++; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index bebc8a4..6139767a5c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -156,6 +156,7 @@ class Executing extends StateWithExecutionGraph getExecutionGraphHandler(), getOperatorCoordinatorHandler(), Duration.ofMillis(0L), +true, getFailures()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java index eb2c64eae99..bc16cafbf14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/FailureResultUtil.java @@ -30,6 +30,7 @@ public class FailureResultUtil { sweg.getExecutionGraphHandler(), sweg.getOperatorCoordinatorHandler(), failureResult.getBackoffTime(), +false, sweg.getFailures()); } else { sweg.getLogger().info("Failing job.", failureResult.getFailureCause()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index f647967edb4..1dd3f29778f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -42,7 +42,9 @@ class Restarting extends StateWithExecutionGraph { private final Duration backoffTime; -@Nullable private ScheduledFuture goToWaitingForResourcesFuture; +@Nullable private ScheduledFuture goToSubsequentStateFuture; + +private final boolean forcedRestart; Restarting( Context context, @@ -51,6 +53,7 @@ class Restarting extends StateWithExecutionGraph { OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Duration backoffTime, +boolean forcedRestart, ClassLoader userCodeClassLoader, List failureCollection) { super( @@ -63,14 +66,15 @@ class Restarting extends StateWithExecutionGraph { failureCollection); this.context = context; this.backoffTime = backoffTime; +this.forcedRestart = for
(flink) branch master updated (1d5b214eb68 -> db682b9869d)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 1d5b214eb68 [FLINK-36246] Move async state related operators flink-runtime (#25306) add db682b9869d [FLINK-36014] [runtime] Align the desired and sufficient resources definiton in Executing and WaitForResources states No new revisions were added by this update. Summary of changes: .../generated/all_jobmanager_section.html | 18 ++--- .../generated/expert_scheduling_section.html | 18 ++--- .../generated/job_manager_configuration.html | 18 ++--- .../flink/configuration/JobManagerOptions.java | 61 +--- .../scheduler/adaptive/AdaptiveScheduler.java | 51 +- .../adaptive/DefaultStateTransitionManager.java| 24 +++ .../runtime/scheduler/adaptive/Executing.java | 50 +++-- .../EnforceMinimalIncreaseRescalingController.java | 48 - ...nforceParallelismChangeRescalingController.java | 41 --- .../scalingpolicy/RescalingController.java | 38 -- .../adaptive/AdaptiveSchedulerClusterITCase.java | 3 + .../scheduler/adaptive/AdaptiveSchedulerTest.java | 29 .../DefaultStateTransitionManagerTest.java | 60 .../runtime/scheduler/adaptive/ExecutingTest.java | 82 -- .../adaptive/StateTrackingMockExecutionGraph.java | 5 +- ...orceMinimalIncreaseRescalingControllerTest.java | 69 -- ...ceParallelismChangeRescalingControllerTest.java | 61 17 files changed, 256 insertions(+), 420 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingController.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceParallelismChangeRescalingController.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/RescalingController.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceMinimalIncreaseRescalingControllerTest.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/EnforceParallelismChangeRescalingControllerTest.java
(flink) branch master updated: [FLINK-36012] [runtime] Integrate StateTransitionManager into WaitingForResources state
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1f303a218f4 [FLINK-36012] [runtime] Integrate StateTransitionManager into WaitingForResources state 1f303a218f4 is described below commit 1f303a218f47aa1a8eaf1323fc2a5debe914762d Author: Zdenek Tison AuthorDate: Fri Jul 19 11:00:10 2024 +0200 [FLINK-36012] [runtime] Integrate StateTransitionManager into WaitingForResources state --- .../scheduler/adaptive/AdaptiveScheduler.java | 50 +++- .../adaptive/DefaultStateTransitionManager.java| 88 +++ .../runtime/scheduler/adaptive/Executing.java | 12 +- .../scheduler/adaptive/StateTransitionManager.java | 11 - .../scheduler/adaptive/WaitingForResources.java| 109 - .../adaptive/AdaptiveSchedulerBuilder.java | 8 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 193 +--- .../DefaultStateTransitionManagerTest.java | 34 +-- .../runtime/scheduler/adaptive/ExecutingTest.java | 51 +++-- .../adaptive/TestingStateTransitionManager.java| 39 ++-- .../adaptive/WaitingForResourcesTest.java | 255 - 11 files changed, 445 insertions(+), 405 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index ebcfc369c82..b5b98984a11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -136,6 +136,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; +import java.time.Instant; +import java.time.temporal.Temporal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -185,6 +187,24 @@ public class AdaptiveScheduler private static final Logger LOG = LoggerFactory.getLogger(AdaptiveScheduler.class); +/** + * Named callback interface for creating {@code StateTransitionManager} instances. This internal + * interface allows for easier testing of the parameter injection in a unit test. + * + * @see + * DefaultStateTransitionManager#DefaultStateTransitionManager(StateTransitionManager.Context, + * Duration, Duration, Duration, Temporal) + */ +@FunctionalInterface +interface StateTransitionManagerFactory { +StateTransitionManager create( +StateTransitionManager.Context context, +Duration cooldownTimeout, +@Nullable Duration resourceStabilizationTimeout, +Duration maximumDelayForTrigger, +Temporal lastStateTransition); +} + /** * Consolidated settings for the adaptive scheduler. This class is used to avoid passing around * multiple config options. @@ -349,7 +369,7 @@ public class AdaptiveScheduler } private final Settings settings; -private final StateTransitionManager.Factory stateTransitionManagerFactory; +private final StateTransitionManagerFactory stateTransitionManagerFactory; private final JobGraph jobGraph; @@ -427,7 +447,7 @@ public class AdaptiveScheduler throws JobExecutionException { this( settings, -DefaultStateTransitionManager.Factory.fromSettings(settings), +DefaultStateTransitionManager::new, (metricGroup, checkpointStatsListener) -> new DefaultCheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), @@ -455,7 +475,7 @@ public class AdaptiveScheduler @VisibleForTesting AdaptiveScheduler( Settings settings, -StateTransitionManager.Factory stateTransitionManagerFactory, +StateTransitionManagerFactory stateTransitionManagerFactory, BiFunction checkpointStatsTrackerFactory, JobGraph jobGraph, @@ -1143,10 +1163,20 @@ public class AdaptiveScheduler this, LOG, settings.getInitialResourceAllocationTimeout(), -settings.getResourceStabilizationTimeout(), +this::createWaitingForResourceStateTransitionManager, previousExecutionGraph)); } +private StateTransitionManager createWaitingForResourceStateTransitionManager( +StateTransitionManager.Context ctx) { +return stateTransitionManagerFactory.create( +ctx, +Duration.Z
(flink) branch master updated: [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c869326d089 [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager c869326d089 is described below commit c869326d089705475481c2c2ea42a6efabb8c828 Author: Zdenek Tison AuthorDate: Fri Jul 12 15:01:55 2024 +0200 [FLINK-36011] [runtime] Generalize RescaleManager to become StateTransitionManager --- .../scheduler/adaptive/AdaptiveScheduler.java | 10 +- .../scheduler/adaptive/DefaultRescaleManager.java | 232 --- .../adaptive/DefaultStateTransitionManager.java| 434 + .../runtime/scheduler/adaptive/Executing.java | 38 +- ...aleManager.java => StateTransitionManager.java} | 39 +- .../adaptive/AdaptiveSchedulerBuilder.java | 14 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 4 +- .../adaptive/DefaultRescaleManagerTest.java| 675 --- .../DefaultStateTransitionManagerTest.java | 722 + .../runtime/scheduler/adaptive/ExecutingTest.java | 41 +- ...ger.java => TestingStateTransitionManager.java} | 16 +- 11 files changed, 1250 insertions(+), 975 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 2fd7b694b73..ebcfc369c82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -349,7 +349,7 @@ public class AdaptiveScheduler } private final Settings settings; -private final RescaleManager.Factory rescaleManagerFactory; +private final StateTransitionManager.Factory stateTransitionManagerFactory; private final JobGraph jobGraph; @@ -427,7 +427,7 @@ public class AdaptiveScheduler throws JobExecutionException { this( settings, -DefaultRescaleManager.Factory.fromSettings(settings), +DefaultStateTransitionManager.Factory.fromSettings(settings), (metricGroup, checkpointStatsListener) -> new DefaultCheckpointStatsTracker( configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE), @@ -455,7 +455,7 @@ public class AdaptiveScheduler @VisibleForTesting AdaptiveScheduler( Settings settings, -RescaleManager.Factory rescaleManagerFactory, +StateTransitionManager.Factory stateTransitionManagerFactory, BiFunction checkpointStatsTrackerFactory, JobGraph jobGraph, @@ -480,7 +480,7 @@ public class AdaptiveScheduler assertPreconditions(jobGraph); this.settings = settings; -this.rescaleManagerFactory = rescaleManagerFactory; +this.stateTransitionManagerFactory = stateTransitionManagerFactory; this.jobGraph = jobGraph; this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), jobGraph.getName()); @@ -1175,7 +1175,7 @@ public class AdaptiveScheduler this, userCodeClassLoader, failureCollection, -rescaleManagerFactory, +stateTransitionManagerFactory, settings.getMinParallelismChangeForDesiredRescale(), settings.getRescaleOnFailedCheckpointCount())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java deleted file mode 100644 index 0b0fc013357..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific langu
(flink) 01/02: [hotfix][runtime] Adds ComponentMainThread to thread name of ComponentMainThreadExecutorServiceAdapter
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e207a4598834b6665dbd12e6f964e924dfa67b1d Author: Matthias Pohl AuthorDate: Thu Aug 1 08:43:56 2024 +0200 [hotfix][runtime] Adds ComponentMainThread to thread name of ComponentMainThreadExecutorServiceAdapter --- .../runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java| 1 + 1 file changed, 1 insertion(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java index 55ac8bce611..690ccecf227 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java @@ -78,6 +78,7 @@ public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainT @Nonnull ScheduledExecutorService singleThreadExecutor) { final Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).join(); +thread.setName(String.format("ComponentMainThread-%s", thread.getName())); return new ComponentMainThreadExecutorServiceAdapter(singleThreadExecutor, thread); }
(flink) 02/02: [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper lifecycle management (i.e. closing the scheduler before shutting down the main thread executor)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7cff1da89cbd4f35b12c61a3e109db4a55682e44 Author: Matthias Pohl AuthorDate: Thu Aug 1 11:51:02 2024 +0200 [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper lifecycle management (i.e. closing the scheduler before shutting down the main thread executor) --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 622 +++-- 1 file changed, 336 insertions(+), 286 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 70babe57449..ab0b96fa765 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -119,6 +119,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.jackson.JacksonMapperFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; @@ -188,12 +190,55 @@ public class AdaptiveSchedulerTest { private final ClassLoader classLoader = ClassLoader.getSystemClassLoader(); +private AdaptiveScheduler scheduler; + +@BeforeEach +void before() { +scheduler = null; +} + +@AfterEach +void after() { +closeInExecutorService(scheduler, singleThreadMainThreadExecutor); +} + +private static void closeInExecutorService( +@Nullable AdaptiveScheduler scheduler, ComponentMainThreadExecutor executor) { +if (scheduler != null) { +final CompletableFuture closeFuture = new CompletableFuture<>(); +executor.execute( +() -> { +try { +// no matter what state the scheduler is in; we have to go to Finished +// state to please the Preconditions of the close call +if (scheduler.getState().getClass() != Finished.class) { +scheduler.goToFinished( +scheduler.getArchivedExecutionGraph( +JobStatus.CANCELED, null)); +} +FutureUtils.forward(scheduler.closeAsync(), closeFuture); +} catch (Throwable t) { +closeFuture.completeExceptionally(t); +} +}); +assertThatFuture(closeFuture).eventuallySucceeds(); +} +} + +private void startTestInstanceInMainThread() { +runInMainThread(() -> scheduler.startScheduling()); +} + +private void runInMainThread(Runnable callback) { +CompletableFuture.runAsync(callback, singleThreadMainThreadExecutor).join(); +} + @Test void testInitialState() throws Exception { -final AdaptiveScheduler scheduler = +scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), -mainThreadExecutor, +singleThreadMainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .build(); @@ -206,12 +251,14 @@ public class AdaptiveSchedulerTest { jobGraph.setSnapshotSettings( new JobCheckpointingSettings( CheckpointCoordinatorConfiguration.builder().build(), null)); - -final ArchivedExecutionGraph archivedExecutionGraph = +scheduler = new AdaptiveSchedulerBuilder( -jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) -.build() -.getArchivedExecutionGraph(JobStatus.INITIALIZING, null); +jobGraph, +singleThreadMainThreadExecutor, +EXECUTOR_RESOURCE.getExecutor()) +.build(); +final ArchivedExecutionGraph archivedExecutionGraph = +scheduler.getArchivedExecutionGraph(JobStatus.INITIALIZING, null); ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph); } @@ -223,11 +270,14 @@ public class AdaptiveSchedulerTest { new JobCheckpointingSettings( CheckpointCoordinatorConfiguration.builder().bu
(flink) branch master updated (2ca359a140b -> 7cff1da89cb)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 2ca359a140b [FLINK-36116] Update Javadoc plugin. This closes #25265 new e207a459883 [hotfix][runtime] Adds ComponentMainThread to thread name of ComponentMainThreadExecutorServiceAdapter new 7cff1da89cb [FLINK-36168][runtime] Refactors AdaptiveSchedulerTest to execute proper lifecycle management (i.e. closing the scheduler before shutting down the main thread executor) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../ComponentMainThreadExecutorServiceAdapter.java | 1 + .../scheduler/adaptive/AdaptiveSchedulerTest.java | 622 +++-- 2 files changed, 337 insertions(+), 286 deletions(-)
(flink) branch master updated: [FLINK-33607][build] Updates Maven wrapper version and enables checksum verification for Maven wrapper
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e9dd4683f75 [FLINK-33607][build] Updates Maven wrapper version and enables checksum verification for Maven wrapper e9dd4683f75 is described below commit e9dd4683f758b463d0b5ee18e49cecef6a70c5cf Author: Luke Chen AuthorDate: Wed Aug 14 10:52:40 2024 +0800 [FLINK-33607][build] Updates Maven wrapper version and enables checksum verification for Maven wrapper --- .mvn/wrapper/maven-wrapper.properties | 5 +- mvnw | 256 +++--- mvnw.cmd | 21 +-- 3 files changed, 153 insertions(+), 129 deletions(-) diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties index 20a44b1df3f..0fe40ae8c5c 100644 --- a/.mvn/wrapper/maven-wrapper.properties +++ b/.mvn/wrapper/maven-wrapper.properties @@ -18,6 +18,5 @@ # updating the Maven version requires updates to certain documentation and verification logic distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.6/apache-maven-3.8.6-bin.zip distributionSha256Sum=ccf20a80e75a17ffc34d47c5c95c98c39d426ca17d670f09cd91e877072a9309 -wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar -# TODO FLINK-33607: checksum verification doesn't seem to work under windows -# wrapperSha256Sum=e63a53cfb9c4d291ebe3c2b0edacb7622bbc480326beaa5a0456e412f52f066a +wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.3.2/maven-wrapper-3.3.2.jar +wrapperSha256Sum=3d8f20ce6103913be8b52aef6d994e0c54705fb527324ceb9b835b338739c7a8 diff --git a/mvnw b/mvnw index 8d937f4c14f..5e9618cac26 100755 --- a/mvnw +++ b/mvnw @@ -19,7 +19,7 @@ # # -# Apache Maven Wrapper startup batch script, version 3.2.0 +# Apache Maven Wrapper startup batch script, version 3.3.2 # # Required ENV vars: # -- @@ -33,75 +33,84 @@ # MAVEN_SKIP_RC - flag to disable loading of mavenrc files # -if [ -z "$MAVEN_SKIP_RC" ] ; then +if [ -z "$MAVEN_SKIP_RC" ]; then - if [ -f /usr/local/etc/mavenrc ] ; then + if [ -f /usr/local/etc/mavenrc ]; then . /usr/local/etc/mavenrc fi - if [ -f /etc/mavenrc ] ; then + if [ -f /etc/mavenrc ]; then . /etc/mavenrc fi - if [ -f "$HOME/.mavenrc" ] ; then + if [ -f "$HOME/.mavenrc" ]; then . "$HOME/.mavenrc" fi fi # OS specific support. $var _must_ be set to either true or false. -cygwin=false; -darwin=false; +cygwin=false +darwin=false mingw=false case "$(uname)" in - CYGWIN*) cygwin=true ;; - MINGW*) mingw=true;; - Darwin*) darwin=true -# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home -# See https://developer.apple.com/library/mac/qa/qa1170/_index.html -if [ -z "$JAVA_HOME" ]; then - if [ -x "/usr/libexec/java_home" ]; then -JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME - else -JAVA_HOME="/Library/Java/Home"; export JAVA_HOME - fi +CYGWIN*) cygwin=true ;; +MINGW*) mingw=true ;; +Darwin*) + darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then +if [ -x "/usr/libexec/java_home" ]; then + JAVA_HOME="$(/usr/libexec/java_home)" + export JAVA_HOME +else + JAVA_HOME="/Library/Java/Home" + export JAVA_HOME fi -;; + fi + ;; esac -if [ -z "$JAVA_HOME" ] ; then - if [ -r /etc/gentoo-release ] ; then +if [ -z "$JAVA_HOME" ]; then + if [ -r /etc/gentoo-release ]; then JAVA_HOME=$(java-config --jre-home) fi fi # For Cygwin, ensure paths are in UNIX format before anything is touched -if $cygwin ; then - [ -n "$JAVA_HOME" ] && -JAVA_HOME=$(cygpath --unix "$JAVA_HOME") - [ -n "$CLASSPATH" ] && -CLASSPATH=$(cygpath --path --unix "$CLASSPATH") +if $cygwin; then + [ -n "$JAVA_HOME" ] \ +&& JAVA_HOME=$(cygpath --unix "$JAVA_HOME") + [ -n "$CLASSPATH" ] \ +&& CLASSPATH=$(cygpath --path --unix "$CLASSPATH") fi # For Mingw, ensure paths are in UNIX format before anything is touched -if $mingw ; then - [ -n "$JAVA_HO
(flink) branch master updated: [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1f03a0a62ec [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state 1f03a0a62ec is described below commit 1f03a0a62ec371d24b194759e6c95bf661501e07 Author: David Moravek AuthorDate: Thu Oct 26 12:52:04 2023 +0200 [FLINK-35553][runtime] Wire-up RescaleManager with CheckpointStatsListener in Executing state --- .../generated/all_jobmanager_section.html | 8 +- .../generated/expert_scheduling_section.html | 8 +- .../generated/job_manager_configuration.html | 8 +- .../flink/configuration/JobManagerOptions.java | 21 ++- .../checkpoint/CheckpointStatsListener.java| 33 + .../checkpoint/DefaultCheckpointStatsTracker.java | 29 .../scheduler/adaptive/AdaptiveScheduler.java | 148 +-- .../scheduler/adaptive/DefaultRescaleManager.java | 43 +++--- .../runtime/scheduler/adaptive/Executing.java | 57 +++- .../flink/runtime/scheduler/adaptive/State.java| 34 - .../DefaultCheckpointStatsTrackerTest.java | 67 + .../adaptive/AdaptiveSchedulerBuilder.java | 40 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 156 - .../adaptive/DefaultRescaleManagerTest.java| 3 +- .../runtime/scheduler/adaptive/ExecutingTest.java | 128 - .../test/scheduling/RescaleOnCheckpointITCase.java | 146 +++ 16 files changed, 864 insertions(+), 65 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 4539ce365d5..760ca029e04 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -12,7 +12,7 @@ jobmanager.adaptive-scheduler.max-delay-for-scale-trigger (none) Duration -The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled). jobmanager.adaptive-scheduler.min-parallelism-increase @@ -32,6 +32,12 @@ Duration The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).Setting a negative duration will disable the resource tim [...] + + jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count +2 +Integer +The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint. + jobmanager.adaptive-scheduler.scaling-interval.max (none) diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html index 10e5ad134ce..6be6547547c 100644 --- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html +++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html @@ -90,7 +90,7 @@ jobmanager.adaptive-scheduler.max-delay-for-scale-trigger (none) Duration -The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled). jobmanager.adaptive-scheduler.min-parallelism-increase @@ -110,6 +110,12 @@ Duration The maximum time the JobManager will wait to acquire all required resources after a job
(flink) 04/06: [hotfix][runtime] Make CheckpointStatsTracker not rely on numberOfTotalSubtasks
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 91cf5d823ed6cb8c899e69187765dedc59c060c7 Author: Matthias Pohl AuthorDate: Fri May 31 15:58:34 2024 +0200 [hotfix][runtime] Make CheckpointStatsTracker not rely on numberOfTotalSubtasks I double-checked that the JobInitializationMetricsBuilder constructor is called for every local restart of tasks and the JobInitializationMetricsBuilder#reportInitializationMetrics is called by the deployed Subtask after restoring its state. ExecutionAttemptID is correct because the builder creation happens again when a new Execution attempt is triggered (through the local restart). The totalNumberOfSubtasks was a bit misleading here, in my opinion. --- .../runtime/checkpoint/CheckpointCoordinator.java | 12 +++- .../checkpoint/CheckpointCoordinatorGateway.java | 4 +- .../runtime/checkpoint/CheckpointStatsTracker.java | 41 .../JobInitializationMetricsBuilder.java | 61 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 6 +- .../scheduler/DefaultExecutionGraphFactory.java| 20 +- .../runtime/scheduler/ExecutionGraphHandler.java | 7 +- .../flink/runtime/scheduler/SchedulerBase.java | 7 +- .../flink/runtime/scheduler/SchedulerNG.java | 4 +- .../scheduler/adaptive/AdaptiveScheduler.java | 7 +- .../adaptive/StateWithExecutionGraph.java | 7 +- .../flink/runtime/state/TaskStateManagerImpl.java | 3 +- .../taskexecutor/rpc/RpcCheckpointResponder.java | 7 +- .../runtime/taskmanager/CheckpointResponder.java | 4 +- .../CheckpointCoordinatorFailureTest.java | 6 +- .../CheckpointCoordinatorMasterHooksTest.java | 5 +- .../checkpoint/CheckpointCoordinatorTest.java | 23 --- .../CheckpointCoordinatorTestingUtils.java | 5 +- .../checkpoint/CheckpointStatsTrackerTest.java | 74 +- .../checkpoint/JobInitializationMetricsTest.java | 22 +-- .../flink/runtime/dispatcher/DispatcherTest.java | 4 +- .../TestingDefaultExecutionGraphBuilder.java | 8 ++- .../jobmaster/utils/TestingJobMasterGateway.java | 4 +- .../OperatorCoordinatorHolderTest.java | 6 +- .../AbstractCheckpointStatsHandlerTest.java| 4 +- .../DefaultExecutionGraphFactoryTest.java | 33 +- .../runtime/scheduler/TestingSchedulerNG.java | 4 +- .../taskmanager/NoOpCheckpointResponder.java | 4 +- .../taskmanager/TestCheckpointResponder.java | 4 +- .../streaming/state/RocksDBAsyncSnapshotTest.java | 4 +- .../tasks/TaskCheckpointingBehaviourTest.java | 4 +- .../util/CompletingCheckpointResponder.java| 4 +- 32 files changed, 246 insertions(+), 162 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 369a3d6f177..7d318bb4cc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -84,6 +84,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toMap; @@ -1760,7 +1761,14 @@ public class CheckpointCoordinator { throw new IllegalStateException("CheckpointCoordinator is shut down"); } long restoreTimestamp = SystemClock.getInstance().absoluteTimeMillis(); -statsTracker.reportInitializationStartTs(restoreTimestamp); +statsTracker.reportInitializationStarted( +tasks.stream() +.map(ExecutionJobVertex::getTaskVertices) +.flatMap(Stream::of) +.map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getAttemptId) +.collect(Collectors.toSet()), +restoreTimestamp); // Restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); @@ -2158,7 +2166,7 @@ public class CheckpointCoordinator { public void reportInitializationMetrics( ExecutionAttemptID executionAttemptID, SubTaskInitializationMetrics initializationMetrics) { -statsTracker.reportInitializationMetrics(initializationMetrics); +statsTracker.reportInitializationMetrics(executionAttemptID, initializat
(flink) 05/06: [hotfix][runtime] Extracts CheckpointStatsTracker into interface
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7c67d011dd38167336282280c97853f4e009104f Author: Matthias Pohl AuthorDate: Thu Jun 20 16:33:30 2024 +0200 [hotfix][runtime] Extracts CheckpointStatsTracker into interface - Introduces DefaultCheckpointStatsTracker - Introduces NoOpCheckpointStatsTracker --- .../runtime/checkpoint/CheckpointCoordinator.java | 7 +- .../runtime/checkpoint/CheckpointStatsTracker.java | 558 ++--- ...ker.java => DefaultCheckpointStatsTracker.java} | 83 +-- .../checkpoint/NoOpCheckpointStatsTracker.java | 81 +++ .../scheduler/DefaultExecutionGraphFactory.java| 3 +- .../CheckpointCoordinatorFailureTest.java | 2 +- .../CheckpointCoordinatorMasterHooksTest.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 14 +- .../CheckpointCoordinatorTestingUtils.java | 2 +- .../CheckpointCoordinatorTriggeringTest.java | 35 ++ ...java => DefaultCheckpointStatsTrackerTest.java} | 78 +-- .../flink/runtime/dispatcher/DispatcherTest.java | 3 +- .../TestingDefaultExecutionGraphBuilder.java | 8 +- .../OperatorCoordinatorHolderTest.java | 7 +- .../AbstractCheckpointStatsHandlerTest.java| 3 +- .../DefaultExecutionGraphFactoryTest.java | 2 + 16 files changed, 238 insertions(+), 650 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 7d318bb4cc9..1606cad4126 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -2429,7 +2429,12 @@ public class CheckpointCoordinator { } private void reportFinishedTasks( -PendingCheckpointStats pendingCheckpointStats, List finishedTasks) { +@Nullable PendingCheckpointStats pendingCheckpointStats, +List finishedTasks) { +if (pendingCheckpointStats == null) { +return; +} + long now = System.currentTimeMillis(); finishedTasks.forEach( execution -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index 654bc8c3422..b620c0ce4c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -18,34 +18,16 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; -import org.apache.flink.runtime.rest.util.RestMapperUtils; -import org.apache.flink.traces.Span; -import org.apache.flink.traces.SpanBuilder; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.io.StringWriter; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; -import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Tracker for checkpoint statistics. @@ -63,141 +45,7 @@ import static org.apache.flink.util.Preconditions.checkState; * The statistics are accessed via {@link #createSnapshot()} and exposed via both the web * frontend and the {@link Metric} system. */ -public class CheckpointStatsTracker { - -private static final Logger LOG = LoggerFactory.getLogger(CheckpointStatsTracker.class); -private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper(); - -/** - * Lock used to update stats and creating snapshots. Updates always happen from a single Thread - * at a time and there can be multiple concurrent read accesses to the latest stats snapshot. - * - * Currently, writes are executed by whatever Thread executes the coordinator actions (which - * already happens in locked scope). Reads can come from multiple concurrent Netty e
(flink) 06/06: [FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 482969aa24df7633a0bbaac74c4d4f6cd6ab1333 Author: Matthias Pohl AuthorDate: Thu Jun 20 16:33:30 2024 +0200 [FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed. The checkpoint tracker doesn't live in the DefaultExecutionGraphFactory anymore but is moved into the AdaptiveScheduler. This will allow the scheduler to react to checkpoint-related events. --- .../flink/util/function/CachingSupplier.java | 42 --- .../flink/util/function/CachingSupplierTest.java | 38 - .../executiongraph/DefaultExecutionGraph.java | 6 --- .../DefaultExecutionGraphBuilder.java | 5 +-- .../runtime/executiongraph/ExecutionGraph.java | 3 -- .../scheduler/DefaultExecutionGraphFactory.java| 14 +-- .../runtime/scheduler/ExecutionGraphFactory.java | 4 ++ .../runtime/scheduler/ExecutionGraphHandler.java | 25 +--- .../flink/runtime/scheduler/SchedulerBase.java | 13 ++ .../flink/runtime/scheduler/SchedulerUtils.java| 12 ++ .../scheduler/adaptive/AdaptiveScheduler.java | 11 + .../TestingDefaultExecutionGraphBuilder.java | 19 - .../DefaultExecutionGraphFactoryTest.java | 47 +- .../adaptive/StateTrackingMockExecutionGraph.java | 6 --- 14 files changed, 100 insertions(+), 145 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java b/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java deleted file mode 100644 index d1bfce1fe53..000 --- a/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.util.function; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; - -import java.util.function.Supplier; - -/** A {@link Supplier} that returns a single, lazily instantiated, value. */ -@NotThreadSafe -public class CachingSupplier implements Supplier { -private final Supplier backingSupplier; -private @Nullable T cachedValue; - -public CachingSupplier(Supplier backingSupplier) { -this.backingSupplier = backingSupplier; -} - -@Override -public T get() { -if (cachedValue == null) { -cachedValue = backingSupplier.get(); -} -return cachedValue; -} -} diff --git a/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java b/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java deleted file mode 100644 index c2ed0d7018c..000 --- a/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.flink.util.function; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.junit.jupiter.api.Test; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - -import static org.assertj.core.api.Assertions.assertThat; - -class CachingSupplierTest { - -@Test -void testCaching() { -final AtomicInteger instantiationCounts = new AtomicInteg
(flink) 03/06: [hotfix][runtime] Adds missing @Nullable annotation
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 446b582ed701e8c6f9f2c98aaacab1ea7ee0b64e Author: Matthias Pohl AuthorDate: Fri May 31 16:16:39 2024 +0200 [hotfix][runtime] Adds missing @Nullable annotation --- .../flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index e9b1317e46e..44fa1a9b68b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -67,7 +67,7 @@ public class CreatingExecutionGraph extends StateWithoutExecutionGraph { executionGraphWithParallelismFuture, Logger logger, OperatorCoordinatorHandlerFactory operatorCoordinatorFactory, -ExecutionGraph previousExecutionGraph1) { +@Nullable ExecutionGraph previousExecutionGraph) { super(context, logger); this.context = context; this.operatorCoordinatorHandlerFactory = operatorCoordinatorFactory; @@ -83,7 +83,7 @@ public class CreatingExecutionGraph extends StateWithoutExecutionGraph { Duration.ZERO); return null; })); -previousExecutionGraph = previousExecutionGraph1; +this.previousExecutionGraph = previousExecutionGraph; } private void handleExecutionGraphCreation(
(flink) 02/06: [hotfix][runtime] Remove no longer used CheckpointCoordinator#failUnacknowledgedPendingCheckpointsFor method.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 097fc7d470ee8ab37440a96ca0d9a5eb721f5204 Author: David Moravek AuthorDate: Thu Sep 28 11:15:37 2023 -0700 [hotfix][runtime] Remove no longer used CheckpointCoordinator#failUnacknowledgedPendingCheckpointsFor method. --- .../flink/runtime/checkpoint/CheckpointCoordinator.java | 16 1 file changed, 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c05efb10b48..369a3d6f177 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1625,22 +1625,6 @@ public class CheckpointCoordinator { } } -/** - * Fails all pending checkpoints which have not been acknowledged by the given execution attempt - * id. - * - * @param executionAttemptId for which to discard unacknowledged pending checkpoints - * @param cause of the failure - */ -public void failUnacknowledgedPendingCheckpointsFor( -ExecutionAttemptID executionAttemptId, Throwable cause) { -synchronized (lock) { -abortPendingCheckpoints( -checkpoint -> !checkpoint.isAcknowledgedBy(executionAttemptId), -new CheckpointException(CheckpointFailureReason.TASK_FAILURE, cause)); -} -} - private void rememberRecentExpiredCheckpointId(long id) { if (recentExpiredCheckpoints.size() >= NUM_GHOST_CHECKPOINT_IDS) { recentExpiredCheckpoints.removeFirst();
(flink) 01/06: [hotfix][runtime] Removes unused method
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit be938287bfa45538bdcaf0491ef9583841e3447f Author: Matthias Pohl AuthorDate: Fri May 31 14:43:43 2024 +0200 [hotfix][runtime] Removes unused method --- .../org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java | 5 - 1 file changed, 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index ea04211d6f0..fbdf97bd408 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -146,11 +146,6 @@ public class CheckpointStatsTracker { return this; } -@VisibleForTesting -Optional getJobInitializationMetricsBuilder() { -return jobInitializationMetricsBuilder; -} - /** * Creates a new snapshot of the available stats. *
(flink) branch master updated (0b60bba6715 -> 482969aa24d)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 0b60bba6715 Revert "[FLINK-33211][table] support flink table lineage (#24618)" new be938287bfa [hotfix][runtime] Removes unused method new 097fc7d470e [hotfix][runtime] Remove no longer used CheckpointCoordinator#failUnacknowledgedPendingCheckpointsFor method. new 446b582ed70 [hotfix][runtime] Adds missing @Nullable annotation new 91cf5d823ed [hotfix][runtime] Make CheckpointStatsTracker not rely on numberOfTotalSubtasks new 7c67d011dd3 [hotfix][runtime] Extracts CheckpointStatsTracker into interface new 482969aa24d [FLINK-35552][runtime] Rework how CheckpointStatsTracker is constructed. The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/util/function/CachingSupplier.java | 42 -- .../flink/util/function/CachingSupplierTest.java | 38 -- .../runtime/checkpoint/CheckpointCoordinator.java | 35 +- .../checkpoint/CheckpointCoordinatorGateway.java | 4 +- .../runtime/checkpoint/CheckpointStatsTracker.java | 580 ++--- ...ker.java => DefaultCheckpointStatsTracker.java} | 129 ++--- .../JobInitializationMetricsBuilder.java | 61 ++- .../checkpoint/NoOpCheckpointStatsTracker.java | 81 +++ .../executiongraph/DefaultExecutionGraph.java | 6 - .../DefaultExecutionGraphBuilder.java | 5 +- .../runtime/executiongraph/ExecutionGraph.java | 3 - .../apache/flink/runtime/jobmaster/JobMaster.java | 6 +- .../scheduler/DefaultExecutionGraphFactory.java| 29 +- .../runtime/scheduler/ExecutionGraphFactory.java | 4 + .../runtime/scheduler/ExecutionGraphHandler.java | 28 +- .../flink/runtime/scheduler/SchedulerBase.java | 20 +- .../flink/runtime/scheduler/SchedulerNG.java | 4 +- .../flink/runtime/scheduler/SchedulerUtils.java| 12 + .../scheduler/adaptive/AdaptiveScheduler.java | 18 +- .../scheduler/adaptive/CreatingExecutionGraph.java | 4 +- .../adaptive/StateWithExecutionGraph.java | 7 +- .../flink/runtime/state/TaskStateManagerImpl.java | 3 +- .../taskexecutor/rpc/RpcCheckpointResponder.java | 7 +- .../runtime/taskmanager/CheckpointResponder.java | 4 +- .../CheckpointCoordinatorFailureTest.java | 8 +- .../CheckpointCoordinatorMasterHooksTest.java | 5 +- .../checkpoint/CheckpointCoordinatorTest.java | 35 +- .../CheckpointCoordinatorTestingUtils.java | 5 +- .../CheckpointCoordinatorTriggeringTest.java | 35 ++ ...java => DefaultCheckpointStatsTrackerTest.java} | 138 +++-- .../checkpoint/JobInitializationMetricsTest.java | 22 +- .../flink/runtime/dispatcher/DispatcherTest.java | 7 +- .../TestingDefaultExecutionGraphBuilder.java | 21 +- .../jobmaster/utils/TestingJobMasterGateway.java | 4 +- .../OperatorCoordinatorHolderTest.java | 5 +- .../AbstractCheckpointStatsHandlerTest.java| 7 +- .../DefaultExecutionGraphFactoryTest.java | 66 ++- .../runtime/scheduler/TestingSchedulerNG.java | 4 +- .../adaptive/StateTrackingMockExecutionGraph.java | 6 - .../taskmanager/NoOpCheckpointResponder.java | 4 +- .../taskmanager/TestCheckpointResponder.java | 4 +- .../streaming/state/RocksDBAsyncSnapshotTest.java | 4 +- .../tasks/TaskCheckpointingBehaviourTest.java | 4 +- .../util/CompletingCheckpointResponder.java| 4 +- 44 files changed, 552 insertions(+), 966 deletions(-) delete mode 100644 flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java delete mode 100644 flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java copy flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/{CheckpointStatsTracker.java => DefaultCheckpointStatsTracker.java} (83%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/NoOpCheckpointStatsTracker.java rename flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/{CheckpointStatsTrackerTest.java => DefaultCheckpointStatsTrackerTest.java} (82%)
(flink) branch master updated: Revert "[FLINK-33211][table] support flink table lineage (#24618)"
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 0b60bba6715 Revert "[FLINK-33211][table] support flink table lineage (#24618)" 0b60bba6715 is described below commit 0b60bba6715991d3bc9332956ca6abfef983ca79 Author: Matthias Pohl AuthorDate: Wed Jul 3 08:34:07 2024 +0200 Revert "[FLINK-33211][table] support flink table lineage (#24618)" This reverts commit 960363cd3c6c82f7e56ef781295756105f7b5eba. --- .../api/functions/source/FromElementsFunction.java | 30 + .../flink/streaming/api/graph/StreamGraph.java | 9 -- .../streaming/api/graph/StreamGraphGenerator.java | 6 +- .../api/lineage/DefaultLineageDataset.java | 54 .../streaming/api/lineage/DefaultLineageEdge.java | 46 --- .../streaming/api/lineage/DefaultLineageGraph.java | 8 +- .../flink/streaming/api/lineage/LineageGraph.java | 5 +- .../streaming/api/lineage/LineageGraphUtils.java | 86 .../transformations/LegacySinkTransformation.java | 2 +- .../LegacySourceTransformation.java| 2 +- .../api/transformations/SinkTransformation.java| 2 +- .../api/transformations/SourceTransformation.java | 2 +- .../transformations/TransformationWithLineage.java | 75 --- .../translators/SinkTransformationTranslator.java | 2 +- .../apache/flink/table/operations/ModifyType.java | 31 - .../table/operations/SinkModifyOperation.java | 10 ++ flink-table/flink-table-planner/pom.xml| 9 +- .../table/planner/lineage/TableLineageDataset.java | 37 - .../planner/lineage/TableLineageDatasetImpl.java | 100 -- .../table/planner/lineage/TableLineageUtils.java | 93 - .../planner/lineage/TableSinkLineageVertex.java| 34 - .../lineage/TableSinkLineageVertexImpl.java| 47 --- .../planner/lineage/TableSourceLineageVertex.java | 26 .../lineage/TableSourceLineageVertexImpl.java | 47 --- .../operations/SqlNodeToOperationConversion.java | 8 +- .../plan/nodes/exec/common/CommonExecSink.java | 54 ++-- .../exec/common/CommonExecTableSourceScan.java | 61 ++--- .../flink/connector/source/ValuesSource.java | 44 +- .../factories/TestValuesRuntimeFunctions.java | 96 ++--- .../plan/batch/sql/TableLineageGraphTest.java | 37 - .../plan/common/TableLineageGraphTestBase.java | 150 - .../plan/nodes/exec/TransformationsTest.java | 20 --- .../plan/stream/sql/TableLineageGraphTest.java | 37 - .../test/resources/lineage-graph/query-batch.json | 70 -- .../test/resources/lineage-graph/query-stream.json | 70 -- .../test/resources/lineage-graph/union-batch.json | 117 .../test/resources/lineage-graph/union-stream.json | 117 .../flink/table/planner/utils/TableTestBase.scala | 17 --- .../operators/values/ValuesInputFormat.java| 29 +--- pom.xml| 1 - 40 files changed, 60 insertions(+), 1631 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index c183b6494e2..aff13245afa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -25,18 +25,12 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.lineage.DefaultLineageDataset; -import org.apache.flink.streaming.api.lineage.LineageDataset; -import org.apache.flink.streaming.api.lineage.LineageVertex; -import org.apache.flink.streaming.api.lineage.LineageVertexProvider; -import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.util.It
(flink) 01/02: [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit ab856fc6c8b1cdd402c11f7755b01a51aafd8f4c Author: morazow AuthorDate: Fri Mar 1 11:38:36 2024 +0100 [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow Co-authored-by: Matthias Pohl --- .github/actions/stringify/action.yml| 42 + .github/workflows/nightly.yml | 48 + .github/workflows/template.flink-ci.yml | 15 --- 3 files changed, 95 insertions(+), 10 deletions(-) diff --git a/.github/actions/stringify/action.yml b/.github/actions/stringify/action.yml new file mode 100644 index 000..e05547f78dd --- /dev/null +++ b/.github/actions/stringify/action.yml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: "Stringify" +description: "Stringifies the given input value." +inputs: + value: +description: "Input value to stringify." +required: true +outputs: + stringified_value: +description: "Stringified output value." +value: ${{ steps.stringify-step.outputs.stringified_value }} +runs: + using: "composite" + steps: +- name: "Stringify '${{ inputs.value }}'" + id: stringify-step + shell: bash + run: | +# adds a stringified version of the workflow name that can be used for generating unique build artifact names within a composite workflow +# - replaces any special characters (except for underscores and dots) with dashes +# - makes the entire string lowercase +# - condenses multiple dashes into a single one +# - removes leading and following dashes +stringified_value=$(echo "${{ inputs.value }}" | tr -C '[:alnum:]._' '-' | tr '[:upper:]' '[:lower:]' | sed -e 's/--*/-/g' -e 's/^-*//g' -e 's/-*$//g') +echo "stringified_value=${stringified_value}" >> $GITHUB_OUTPUT diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 2090a3b1156..67994be49a5 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -94,3 +94,51 @@ jobs: s3_bucket: ${{ secrets.IT_CASE_S3_BUCKET }} s3_access_key: ${{ secrets.IT_CASE_S3_ACCESS_KEY }} s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }} + + build_python_wheels: +name: "Build Python Wheels on ${{ matrix.os }}" +runs-on: ${{ matrix.os }} +strategy: + fail-fast: false + matrix: +include: + - os: ubuntu-latest +os_name: linux +python-version: 3.9 + - os: macos-latest +os_name: macos +python-version: 3.9 +steps: + - name: "Checkout the repository" +uses: actions/checkout@v4 +with: + fetch-depth: 0 + persist-credentials: false + - name: "Install python" +uses: actions/setup-python@v5 +with: + python-version: ${{ matrix.python-version }} + - name: "Stringify workflow name" +uses: "./.github/actions/stringify" +id: stringify_workflow +with: + value: ${{ github.workflow }} + - name: "Build python wheels for ${{ matrix.os_name }}" +if: matrix.os_name == 'linux' +run: | + cd flink-python + bash dev/build-wheels.sh + - name: "Build python wheels for ${{ matrix.os }}" +if: matrix.os_name == 'macos' +run: | + cd flink-python + python -m pip install --upgrade pip + pip install cibuildwheel==2.8.0 + cibuildwheel --platform macos --output-dir dist . + - name: "Tar python artifact" +run: tar -czvf python-wheel-${{ matrix.os_name }}.tar.gz -C flink-python/dist . + - name: "Upload python artifact" +uses: actions/upload-artifact@v4 +with: + name: wheel_${
(flink) 02/02: [FLINK-34582] Updates cibuildwheel to support cpython 3.11 wheel package
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c75248e48f41d227be934c9754f7dd3f3b571e37 Author: morazow AuthorDate: Thu May 23 16:51:10 2024 +0200 [FLINK-34582] Updates cibuildwheel to support cpython 3.11 wheel package --- .github/workflows/nightly.yml | 39 +-- flink-python/pyproject.toml | 3 +++ 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 67994be49a5..adbdf660888 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -96,7 +96,7 @@ jobs: s3_secret_key: ${{ secrets.IT_CASE_S3_SECRET_KEY }} build_python_wheels: -name: "Build Python Wheels on ${{ matrix.os }}" +name: "Build Python Wheels on ${{ matrix.os_name }}" runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -104,37 +104,40 @@ jobs: include: - os: ubuntu-latest os_name: linux -python-version: 3.9 - os: macos-latest os_name: macos -python-version: 3.9 steps: - name: "Checkout the repository" uses: actions/checkout@v4 with: fetch-depth: 0 persist-credentials: false - - name: "Install python" -uses: actions/setup-python@v5 -with: - python-version: ${{ matrix.python-version }} - name: "Stringify workflow name" uses: "./.github/actions/stringify" id: stringify_workflow with: value: ${{ github.workflow }} + # Used to host cibuildwheel + - name: "Setup Python" +uses: actions/setup-python@v5 +with: + python-version: '3.x' + - name: "Install cibuildwheel" +run: python -m pip install cibuildwheel==2.16.5 - name: "Build python wheels for ${{ matrix.os_name }}" -if: matrix.os_name == 'linux' -run: | - cd flink-python - bash dev/build-wheels.sh - - name: "Build python wheels for ${{ matrix.os }}" -if: matrix.os_name == 'macos' -run: | - cd flink-python - python -m pip install --upgrade pip - pip install cibuildwheel==2.8.0 - cibuildwheel --platform macos --output-dir dist . +run: python -m cibuildwheel --output-dir flink-python/dist flink-python +env: + # Skip -musllinux on Linux builds + CIBW_SKIP: "*-musllinux*" + # Use manylinux2014 on Linux + CIBW_MANYLINUX_X86_64_IMAGE: manylinux2014 + CIBW_BEFORE_ALL_LINUX: pip install patchelf + CIBW_BEFORE_BUILD_LINUX: pip install -r flink-python/dev/dev-requirements.txt + CIBW_ENVIRONMENT_LINUX: CFLAGS="-I. -include ./dev/glibc_version_fix.h" + # Run auditwheel repair on Linux + CIBW_REPAIR_WHEEL_COMMAND_LINUX: "auditwheel repair -w {dest_dir} {wheel}" + # Skip repair on MacOS + CIBW_REPAIR_WHEEL_COMMAND_MACOS: "" - name: "Tar python artifact" run: tar -czvf python-wheel-${{ matrix.os_name }}.tar.gz -C flink-python/dist . - name: "Upload python artifact" diff --git a/flink-python/pyproject.toml b/flink-python/pyproject.toml index c21b2fccd5e..0ec9012fbc8 100644 --- a/flink-python/pyproject.toml +++ b/flink-python/pyproject.toml @@ -31,3 +31,6 @@ build = ["cp38-*", "cp39-*", "cp310-*", "cp311-*"] [tool.cibuildwheel.macos] archs = ["x86_64", "arm64"] + +[tool.cibuildwheel.linux] +archs = ["x86_64"]
(flink) branch master updated (e56b54db40a -> c75248e48f4)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from e56b54db40a [FLINK-35625][cli] Merge "flink run" and "flink run-application" functionality, deprecate "run-application" new ab856fc6c8b [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow new c75248e48f4 [FLINK-34582] Updates cibuildwheel to support cpython 3.11 wheel package The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/actions/stringify/action.yml| 42 +++ .github/workflows/nightly.yml | 51 + .github/workflows/template.flink-ci.yml | 15 -- flink-python/pyproject.toml | 3 ++ 4 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 .github/actions/stringify/action.yml
(flink) branch master updated: [FLINK-35551][runtime] Adds RescaleManager#onTrigger
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 14140db6a46 [FLINK-35551][runtime] Adds RescaleManager#onTrigger 14140db6a46 is described below commit 14140db6a4682384a8178e4eb2885103706abb79 Author: Matthias Pohl AuthorDate: Wed May 29 16:19:36 2024 +0200 [FLINK-35551][runtime] Adds RescaleManager#onTrigger - integrates trigger logic in DefaultRescaleManager - a new parameter jobmanager.adaptive-scheduler.max-delay-for-scale-trigger --- .../generated/all_jobmanager_section.html | 6 + .../generated/expert_scheduling_section.html | 6 + .../generated/job_manager_configuration.html | 6 + .../flink/configuration/JobManagerOptions.java | 19 +++ .../scheduler/adaptive/AdaptiveScheduler.java | 31 - .../adaptive/AdaptiveSchedulerFactory.java | 3 +- .../scheduler/adaptive/DefaultRescaleManager.java | 88 - .../runtime/scheduler/adaptive/Executing.java | 9 +- .../runtime/scheduler/adaptive/RescaleManager.java | 6 + .../adaptive/DefaultRescaleManagerTest.java| 145 +++-- .../runtime/scheduler/adaptive/ExecutingTest.java | 16 ++- .../scheduler/adaptive/TestingRescaleManager.java | 17 ++- 12 files changed, 321 insertions(+), 31 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 626aa50f386..4539ce365d5 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -8,6 +8,12 @@ + + jobmanager.adaptive-scheduler.max-delay-for-scale-trigger +(none) +Duration +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). + jobmanager.adaptive-scheduler.min-parallelism-increase 1 diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html index 86682f4fbfd..10e5ad134ce 100644 --- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html +++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html @@ -86,6 +86,12 @@ MemorySize The size of the write buffer of JobEventStore. The content will be flushed to external file system once the buffer is full + + jobmanager.adaptive-scheduler.max-delay-for-scale-trigger +(none) +Duration +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). + jobmanager.adaptive-scheduler.min-parallelism-increase 1 diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html index c01601a031a..df84946a5b0 100644 --- a/docs/layouts/shortcodes/generated/job_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html @@ -8,6 +8,12 @@ + + jobmanager.adaptive-scheduler.max-delay-for-scale-trigger +(none) +Duration +The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). + jobmanager.adaptive-scheduler.min-parallelism-increase 1 diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 77bfce2a9f8..ab95a0e2669 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -39,6 +39,7 @@ import static org.apache.flink.configuration.description.TextElement.text; public class JobManagerOptions { public static final MemorySize MIN_JVM_HEAP_SIZE = MemorySize.ofMebiBytes(128); +public static final int FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER = 3; /** * The config parameter defining the network address to connect to for communication with the @@ -573,6 +574,24 @@ public class JobManagerOptions { code
(flink) branch master updated (7f1399561b3 -> ac4a275f0fe)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 7f1399561b3 [FLINK-35550][runtime] Adds new component RescaleManager that handles the rescaling logic to improve code testability and extensibility add c534e88bf12 [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework add ac4a275f0fe [FLINK-20398][test] solve deprecation in BatchSQLTest source table creation No new revisions were added by this update. Summary of changes: .../flink-batch-sql-test/pom.xml | 69 .../flink/sql/tests/BatchSQLTestProgram.java | 188 - .../org/apache/flink/sql/tests/BatchSQLTest.java | 154 + .../java/org/apache/flink/sql/tests/Generator.java | 82 + .../flink/sql/tests/GeneratorTableSource.java | 53 ++ .../sql/tests/GeneratorTableSourceFactory.java | 86 ++ .../org.apache.flink.table.factories.Factory | 16 ++ .../src/test/resources/log4j2-test.properties | 33 .../src/test/resources/sql-job-query.sql | 48 ++ flink-end-to-end-tests/run-nightly-tests.sh| 3 - .../test-scripts/test_batch_sql.sh | 82 - .../testframe/source/FromElementsSource.java | 42 - .../testframe/source/FromElementsSourceReader.java | 15 +- 13 files changed, 555 insertions(+), 316 deletions(-) delete mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratorTableSource.java create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratorTableSourceFactory.java create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties create mode 100644 flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/sql-job-query.sql delete mode 100755 flink-end-to-end-tests/test-scripts/test_batch_sql.sh
(flink) branch master updated: [FLINK-35550][runtime] Adds new component RescaleManager that handles the rescaling logic to improve code testability and extensibility
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7f1399561b3 [FLINK-35550][runtime] Adds new component RescaleManager that handles the rescaling logic to improve code testability and extensibility 7f1399561b3 is described below commit 7f1399561b3ff303ea48ec0b93eef79eaaf3e4c8 Author: Matthias Pohl AuthorDate: Thu May 23 16:56:56 2024 +0200 [FLINK-35550][runtime] Adds new component RescaleManager that handles the rescaling logic to improve code testability and extensibility Rescaling is a state-specific functionality. Moving all the logic into Executing state allows us to align the resource controlling in Executing state and WaitingForResources state in a future effort. --- .../scheduler/adaptive/AdaptiveScheduler.java | 64 +-- .../scheduler/adaptive/DefaultRescaleManager.java | 167 +++ .../runtime/scheduler/adaptive/Executing.java | 175 +++ .../runtime/scheduler/adaptive/RescaleManager.java | 64 +++ .../EnforceMinimalIncreaseRescalingController.java | 7 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 46 +- .../adaptive/DefaultRescaleManagerTest.java| 553 + .../runtime/scheduler/adaptive/ExecutingTest.java | 181 +-- .../scheduler/adaptive/TestingRescaleManager.java | 53 ++ ...orceMinimalIncreaseRescalingControllerTest.java | 12 +- 10 files changed, 971 insertions(+), 351 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 7a8f3ae6570..c5310de8b5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -57,7 +57,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore; @@ -110,9 +109,6 @@ import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation; import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; -import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController; -import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController; -import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics; @@ -147,8 +143,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.stream.Collectors; +import static org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE; import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking; /** @@ -228,7 +224,8 @@ public class AdaptiveScheduler .orElse(stabilizationTimeoutDefault), configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT), scalingIntervalMin, -scalingIntervalMax); +scalingIntervalMax, +configuration.get(MIN_PARALLELISM_INCREASE)); } private final SchedulerExecutionMode executionMode; @@ -237,6 +234,7 @@ public class AdaptiveScheduler private final Duration slotIdleTimeout; private final Duration scalingIntervalMin; private final Duration scalingIntervalMax; +private final int minParallelismChangeForDesiredRescale; private Settings( SchedulerExecutionMode executionMode, @@ -244,13 +242,15 @@ public class AdaptiveScheduler Duration resourceStabilizationTimeout, Duration slotIdleTimeout, Duration scalingIntervalMin, -Duration
(flink) branch master updated: Fix artifactId of flink-migration-test-utils module
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new fdd0f80bf70 Fix artifactId of flink-migration-test-utils module fdd0f80bf70 is described below commit fdd0f80bf701cd868c6a650beb1742ebb6bef814 Author: wforget <643348...@qq.com> AuthorDate: Thu Jun 6 15:59:19 2024 +0800 Fix artifactId of flink-migration-test-utils module --- flink-connectors/flink-hadoop-compatibility/pom.xml | 2 +- flink-core/pom.xml | 2 +- flink-formats/flink-avro/pom.xml | 2 +- flink-fs-tests/pom.xml | 2 +- flink-libraries/flink-cep/pom.xml| 2 +- flink-runtime/pom.xml| 2 +- flink-scala/pom.xml | 2 +- flink-streaming-java/pom.xml | 2 +- flink-table/flink-table-runtime/pom.xml | 2 +- flink-test-utils-parent/flink-migration-test-utils/README.md | 4 ++-- flink-test-utils-parent/flink-migration-test-utils/pom.xml | 2 +- flink-tests-java17/pom.xml | 2 +- flink-tests/pom.xml | 2 +- 13 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml index eb9ca61b47f..6b6307688b5 100644 --- a/flink-connectors/flink-hadoop-compatibility/pom.xml +++ b/flink-connectors/flink-hadoop-compatibility/pom.xml @@ -110,7 +110,7 @@ under the License. org.apache.flink - fink-migration-test-utils + flink-migration-test-utils ${project.version} test diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 4521e7464da..5525a986b80 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -175,7 +175,7 @@ under the License. org.apache.flink - fink-migration-test-utils + flink-migration-test-utils ${project.version} test diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml index 4121f4c500d..d33a2726b19 100644 --- a/flink-formats/flink-avro/pom.xml +++ b/flink-formats/flink-avro/pom.xml @@ -199,7 +199,7 @@ under the License. org.apache.flink - fink-migration-test-utils + flink-migration-test-utils ${project.version} test diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml index 88157ae4257..83f84abde41 100644 --- a/flink-fs-tests/pom.xml +++ b/flink-fs-tests/pom.xml @@ -126,7 +126,7 @@ under the License. org.apache.flink - fink-migration-test-utils + flink-migration-test-utils ${project.version} test diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml index 93ffdabe6ae..5cc14c45039 100644 --- a/flink-libraries/flink-cep/pom.xml +++ b/flink-libraries/flink-cep/pom.xml @@ -105,7 +105,7 @@ under the License. org.apache.flink -fink-migration-test-utils +flink-migration-test-utils ${project.version} test diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 6008fe1717e..6f7df598e9d 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -287,7 +287,7 @@ under the License. org.apache.flink - fink-migration-test-utils + flink-migration-test-utils ${project.version} test diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index 8f85d049aa1..f0a4791af42 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -127,7 +127,7 @@ under the License. org.apache.flink - fink-migration-test-utils + flink-migration-test-utils ${project.version} test diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 946289c1e5e..548aa0f401c 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -119,7 +119,7 @@ under the License. org.apache
(flink) branch FLINK-35550 deleted (was 04e7179738b)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch FLINK-35550 in repository https://gitbox.apache.org/repos/asf/flink.git was 04e7179738b JavaDoc fix This change permanently discards the following revisions: discard 04e7179738b JavaDoc fix
svn commit: r69740 - /dev/flink/flink-1.19.1-rc1/ /release/flink/flink-1.19.1/
Author: mapohl Date: Fri Jun 14 14:53:55 2024 New Revision: 69740 Log: Release Flink 1.19.1 Added: release/flink/flink-1.19.1/ - copied from r69739, dev/flink/flink-1.19.1-rc1/ Removed: dev/flink/flink-1.19.1-rc1/
(flink) 01/01: JavaDoc fix
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch FLINK-35550 in repository https://gitbox.apache.org/repos/asf/flink.git commit 04e7179738b5dfea6a6929ce74b08306e1f6e438 Author: Matthias Pohl AuthorDate: Tue Jun 11 15:35:19 2024 +0200 JavaDoc fix --- .../flink/runtime/scheduler/adaptive/DefaultRescaleManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java index 1a35734a02c..3aac9d9385d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultRescaleManager.java @@ -36,9 +36,9 @@ import java.time.temporal.Temporal; import java.util.function.Supplier; /** - * {@code DefaultRescaleManager} manages the rescaling in depending on time of the previous rescale - * operation and the available resources. It handles the event based on the following phases (in - * that order): + * {@code DefaultRescaleManager} manages triggering the next rescaling based on when the previous + * rescale operation happened and the available resources. It handles the event based on the + * following phases (in that order): * * * Cooldown phase: No rescaling takes place (its upper threshold is defined by {@code
(flink) branch FLINK-35550 created (now 04e7179738b)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch FLINK-35550 in repository https://gitbox.apache.org/repos/asf/flink.git at 04e7179738b JavaDoc fix This branch includes the following new commits: new 04e7179738b JavaDoc fix The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(flink) 02/04: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7d98ab060be82fe3684d15501b9eb83373303d18 Author: Matthias Pohl AuthorDate: Wed Jan 31 15:02:24 2024 +0100 [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location --- .../test-scripts/test_file_sink.sh | 117 +++-- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 711f74b6672..7b7728b44f8 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -20,89 +20,96 @@ OUT_TYPE="${1:-local}" # other type: s3 SINK_TO_TEST="${2:-"StreamingFileSink"}" -S3_PREFIX=temp/test_file_sink-$(uuidgen) -OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" -S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" source "$(dirname "$0")"/common.sh -if [ "${OUT_TYPE}" == "s3" ]; then - source "$(dirname "$0")"/common_s3.sh -else - echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)." -fi - -# randomly set up openSSL with dynamically/statically linked libraries -OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) -echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" - -s3_setup hadoop -set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" -set_config_key "metrics.fetcher.update-interval" "2000" -# this test relies on global failovers -set_config_key "jobmanager.execution.failover-strategy" "full" - -mkdir -p $OUTPUT_PATH - -if [ "${OUT_TYPE}" == "local" ]; then - echo "Use local output" - JOB_OUTPUT_PATH=${OUTPUT_PATH} -elif [ "${OUT_TYPE}" == "s3" ]; then - echo "Use s3 output" - JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} - set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk" - mkdir -p "$OUTPUT_PATH-chk" -else - echo "Unknown output type: ${OUT_TYPE}" - exit 1 -fi - -# make sure we delete the file at the end -function out_cleanup { - s3_delete_by_full_path_prefix "$S3_PREFIX" - s3_delete_by_full_path_prefix "${S3_PREFIX}-chk" - rollback_openssl_lib -} -if [ "${OUT_TYPE}" == "s3" ]; then - on_exit out_cleanup -fi +# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download folder for remote data +# the helper functions will access this folder +RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)" +LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}" +mkdir -p "${LOCAL_JOB_OUTPUT_PATH}" -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" +# JOB_OUTPUT_PATH is the location where the job writes its data to +JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}" ### # Get all lines in part files and sort them numerically. # # Globals: -# OUTPUT_PATH +# LOCAL_JOB_OUTPUT_PATH # Arguments: # None # Returns: # sorted content of part files ### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then -s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi - find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } ### # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# LOCAL_JOB_OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then -get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then -s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is loaded." +elif [ "${OUT_TYPE}" == "s3" ]; then + # the s3 context requires additional + source "$(dirname "$0")"/common_s3.sh + s3_setup hadoop
(flink) branch release-1.18 updated (9d0858ee745 -> 09f7b070989)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git from 9d0858ee745 [FLINK-34379][table] Fix OutOfMemoryError with large queries new 6f486e3d97d [hotfix][ci] Docker might return multiple port bindings new 7d98ab060be [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location new 1196e9937db [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3 new 09f7b070989 [hotfix][tests] Removes warning that's produced when removing non-existing files The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../test-scripts/common_s3_minio.sh| 4 +- flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +- .../test-scripts/test_file_sink.sh | 118 +++-- 3 files changed, 66 insertions(+), 58 deletions(-)
(flink) 01/04: [hotfix][ci] Docker might return multiple port bindings
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 6f486e3d97dbb0b6d3c09c6085f6f6bab04977b1 Author: Matthias Pohl AuthorDate: Wed Feb 7 11:42:32 2024 +0100 [hotfix][ci] Docker might return multiple port bindings Adding the grep will work around this issue. --- flink-end-to-end-tests/test-scripts/common_s3_minio.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh index a56ce4e9410..d3d08392caa 100644 --- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh +++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh @@ -54,7 +54,7 @@ function s3_start { while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do sleep 0.1 done - export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed s'/0\.0\.0\.0/localhost/')" + export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')" echo "Started minio @ $S3_ENDPOINT" on_exit s3_stop } @@ -115,4 +115,4 @@ function s3_setup_with_provider { set_config_key "s3.path-style-access" "true" } -source "$(dirname "$0")"/common_s3_operations.sh \ No newline at end of file +source "$(dirname "$0")"/common_s3_operations.sh
(flink) 03/04: [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 1196e9937db2f8866ef2c0ef896ef8056798337e Author: Matthias Pohl AuthorDate: Wed Feb 7 11:46:45 2024 +0100 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3 --- flink-end-to-end-tests/test-scripts/test_file_sink.sh | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 7b7728b44f8..204c6442b7e 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -62,8 +62,7 @@ function get_total_number_of_valid_lines { if [ "${OUT_TYPE}" == "local" ]; then echo "[INFO] Test run in local environment: No S3 environment is loaded." elif [ "${OUT_TYPE}" == "s3" ]; then - # the s3 context requires additional - source "$(dirname "$0")"/common_s3.sh + source "$(dirname "$0")"/common_s3_minio.sh s3_setup hadoop # overwrites JOB_OUTPUT_PATH to point to S3 @@ -90,7 +89,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then function out_cleanup { s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" -rollback_openssl_lib } on_exit out_cleanup @@ -104,6 +102,9 @@ OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" +# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback at the end of the test run +on_exit rollback_openssl_lib + set_config_key "metrics.fetcher.update-interval" "2000" # this test relies on global failovers set_config_key "jobmanager.execution.failover-strategy" "full"
(flink) 04/04: [hotfix][tests] Removes warning that's produced when removing non-existing files
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 09f7b070989a906d777a000e6ec3d9b45e192a29 Author: Matthias Pohl AuthorDate: Tue Apr 2 19:01:25 2024 +0200 [hotfix][tests] Removes warning that's produced when removing non-existing files --- flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/test-scripts/common_ssl.sh b/flink-end-to-end-tests/test-scripts/common_ssl.sh index 2d99e29c4f5..8d4bc50b0a3 100644 --- a/flink-end-to-end-tests/test-scripts/common_ssl.sh +++ b/flink-end-to-end-tests/test-scripts/common_ssl.sh @@ -136,5 +136,5 @@ function set_conf_ssl { } function rollback_openssl_lib() { - rm $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar + rm -f $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar }
(flink) 03/04: [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 37bcfb867b652a2c970c5e552954100fb2c4dee9 Author: Matthias Pohl AuthorDate: Wed Feb 7 11:46:45 2024 +0100 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3 --- flink-end-to-end-tests/test-scripts/test_file_sink.sh | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 7b7728b44f8..204c6442b7e 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -62,8 +62,7 @@ function get_total_number_of_valid_lines { if [ "${OUT_TYPE}" == "local" ]; then echo "[INFO] Test run in local environment: No S3 environment is loaded." elif [ "${OUT_TYPE}" == "s3" ]; then - # the s3 context requires additional - source "$(dirname "$0")"/common_s3.sh + source "$(dirname "$0")"/common_s3_minio.sh s3_setup hadoop # overwrites JOB_OUTPUT_PATH to point to S3 @@ -90,7 +89,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then function out_cleanup { s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" -rollback_openssl_lib } on_exit out_cleanup @@ -104,6 +102,9 @@ OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" +# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback at the end of the test run +on_exit rollback_openssl_lib + set_config_key "metrics.fetcher.update-interval" "2000" # this test relies on global failovers set_config_key "jobmanager.execution.failover-strategy" "full"
(flink) 01/04: [hotfix][ci] Docker might return multiple port bindings
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 95fb6c1e363c0e9c7f6aebeb66d8ceb7a33cb9d8 Author: Matthias Pohl AuthorDate: Wed Feb 7 11:42:32 2024 +0100 [hotfix][ci] Docker might return multiple port bindings Adding the grep will work around this issue. --- flink-end-to-end-tests/test-scripts/common_s3_minio.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh index a56ce4e9410..d3d08392caa 100644 --- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh +++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh @@ -54,7 +54,7 @@ function s3_start { while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do sleep 0.1 done - export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed s'/0\.0\.0\.0/localhost/')" + export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')" echo "Started minio @ $S3_ENDPOINT" on_exit s3_stop } @@ -115,4 +115,4 @@ function s3_setup_with_provider { set_config_key "s3.path-style-access" "true" } -source "$(dirname "$0")"/common_s3_operations.sh \ No newline at end of file +source "$(dirname "$0")"/common_s3_operations.sh
(flink) 04/04: [hotfix][tests] Removes warning that's produced when removing non-existing files
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 500e92c9eba44f8f79eacf2dbc2bc714b8f66399 Author: Matthias Pohl AuthorDate: Tue Apr 2 19:01:25 2024 +0200 [hotfix][tests] Removes warning that's produced when removing non-existing files --- flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/test-scripts/common_ssl.sh b/flink-end-to-end-tests/test-scripts/common_ssl.sh index 2d99e29c4f5..8d4bc50b0a3 100644 --- a/flink-end-to-end-tests/test-scripts/common_ssl.sh +++ b/flink-end-to-end-tests/test-scripts/common_ssl.sh @@ -136,5 +136,5 @@ function set_conf_ssl { } function rollback_openssl_lib() { - rm $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar + rm -f $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar }
(flink) 02/04: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 8707c63ee147085671a9ae1b294854bac03fc914 Author: Matthias Pohl AuthorDate: Wed Jan 31 15:02:24 2024 +0100 [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location --- .../test-scripts/test_file_sink.sh | 117 +++-- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 711f74b6672..7b7728b44f8 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -20,89 +20,96 @@ OUT_TYPE="${1:-local}" # other type: s3 SINK_TO_TEST="${2:-"StreamingFileSink"}" -S3_PREFIX=temp/test_file_sink-$(uuidgen) -OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" -S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" source "$(dirname "$0")"/common.sh -if [ "${OUT_TYPE}" == "s3" ]; then - source "$(dirname "$0")"/common_s3.sh -else - echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)." -fi - -# randomly set up openSSL with dynamically/statically linked libraries -OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) -echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" - -s3_setup hadoop -set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" -set_config_key "metrics.fetcher.update-interval" "2000" -# this test relies on global failovers -set_config_key "jobmanager.execution.failover-strategy" "full" - -mkdir -p $OUTPUT_PATH - -if [ "${OUT_TYPE}" == "local" ]; then - echo "Use local output" - JOB_OUTPUT_PATH=${OUTPUT_PATH} -elif [ "${OUT_TYPE}" == "s3" ]; then - echo "Use s3 output" - JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} - set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk" - mkdir -p "$OUTPUT_PATH-chk" -else - echo "Unknown output type: ${OUT_TYPE}" - exit 1 -fi - -# make sure we delete the file at the end -function out_cleanup { - s3_delete_by_full_path_prefix "$S3_PREFIX" - s3_delete_by_full_path_prefix "${S3_PREFIX}-chk" - rollback_openssl_lib -} -if [ "${OUT_TYPE}" == "s3" ]; then - on_exit out_cleanup -fi +# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download folder for remote data +# the helper functions will access this folder +RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)" +LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}" +mkdir -p "${LOCAL_JOB_OUTPUT_PATH}" -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" +# JOB_OUTPUT_PATH is the location where the job writes its data to +JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}" ### # Get all lines in part files and sort them numerically. # # Globals: -# OUTPUT_PATH +# LOCAL_JOB_OUTPUT_PATH # Arguments: # None # Returns: # sorted content of part files ### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then -s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi - find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } ### # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# LOCAL_JOB_OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then -get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then -s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is loaded." +elif [ "${OUT_TYPE}" == "s3" ]; then + # the s3 context requires additional + source "$(dirname "$0")"/common_s3.sh + s3_setup hadoop
(flink) branch release-1.19 updated (17e7c3eaf14 -> 500e92c9eba)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git from 17e7c3eaf14 [FLINK-35184][table-runtime] Fix mini-batch join hash collision when use InputSideHasNoUniqueKeyBundle (#24749) new 95fb6c1e363 [hotfix][ci] Docker might return multiple port bindings new 8707c63ee14 [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location new 37bcfb867b6 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3 new 500e92c9eba [hotfix][tests] Removes warning that's produced when removing non-existing files The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../test-scripts/common_s3_minio.sh| 4 +- flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +- .../test-scripts/test_file_sink.sh | 118 +++-- 3 files changed, 66 insertions(+), 58 deletions(-)
(flink) 02/04: [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 93526c2f3247598ce80854cf65dd4440eb5aaa43 Author: Matthias Pohl AuthorDate: Wed Jan 31 15:02:24 2024 +0100 [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location --- .../test-scripts/test_file_sink.sh | 117 +++-- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 711f74b6672..7b7728b44f8 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -20,89 +20,96 @@ OUT_TYPE="${1:-local}" # other type: s3 SINK_TO_TEST="${2:-"StreamingFileSink"}" -S3_PREFIX=temp/test_file_sink-$(uuidgen) -OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" -S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" source "$(dirname "$0")"/common.sh -if [ "${OUT_TYPE}" == "s3" ]; then - source "$(dirname "$0")"/common_s3.sh -else - echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)." -fi - -# randomly set up openSSL with dynamically/statically linked libraries -OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) -echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" - -s3_setup hadoop -set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" -set_config_key "metrics.fetcher.update-interval" "2000" -# this test relies on global failovers -set_config_key "jobmanager.execution.failover-strategy" "full" - -mkdir -p $OUTPUT_PATH - -if [ "${OUT_TYPE}" == "local" ]; then - echo "Use local output" - JOB_OUTPUT_PATH=${OUTPUT_PATH} -elif [ "${OUT_TYPE}" == "s3" ]; then - echo "Use s3 output" - JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} - set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk" - mkdir -p "$OUTPUT_PATH-chk" -else - echo "Unknown output type: ${OUT_TYPE}" - exit 1 -fi - -# make sure we delete the file at the end -function out_cleanup { - s3_delete_by_full_path_prefix "$S3_PREFIX" - s3_delete_by_full_path_prefix "${S3_PREFIX}-chk" - rollback_openssl_lib -} -if [ "${OUT_TYPE}" == "s3" ]; then - on_exit out_cleanup -fi +# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download folder for remote data +# the helper functions will access this folder +RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)" +LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}" +mkdir -p "${LOCAL_JOB_OUTPUT_PATH}" -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" +# JOB_OUTPUT_PATH is the location where the job writes its data to +JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}" ### # Get all lines in part files and sort them numerically. # # Globals: -# OUTPUT_PATH +# LOCAL_JOB_OUTPUT_PATH # Arguments: # None # Returns: # sorted content of part files ### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then -s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi - find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } ### # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# LOCAL_JOB_OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then -get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then -s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is loaded." +elif [ "${OUT_TYPE}" == "s3" ]; then + # the s3 context requires additional + source "$(dirname "$0")"/common_s3.sh + s3_setup hadoop
(flink) branch master updated (4fe66e06974 -> 046872cc8dd)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 4fe66e06974 [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework new b2cdb4aac98 [hotfix][ci] Docker might return multiple port bindings new 93526c2f324 [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location new 91e6e7eb913 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3 new 046872cc8dd [hotfix][tests] Removes warning that's produced when removing non-existing files The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../test-scripts/common_s3_minio.sh| 4 +- flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +- .../test-scripts/test_file_sink.sh | 118 +++-- 3 files changed, 66 insertions(+), 58 deletions(-)
(flink) 04/04: [hotfix][tests] Removes warning that's produced when removing non-existing files
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 046872cc8ddf9af14af1f797200065944fca5b64 Author: Matthias Pohl AuthorDate: Tue Apr 2 19:01:25 2024 +0200 [hotfix][tests] Removes warning that's produced when removing non-existing files --- flink-end-to-end-tests/test-scripts/common_ssl.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/test-scripts/common_ssl.sh b/flink-end-to-end-tests/test-scripts/common_ssl.sh index 2d99e29c4f5..8d4bc50b0a3 100644 --- a/flink-end-to-end-tests/test-scripts/common_ssl.sh +++ b/flink-end-to-end-tests/test-scripts/common_ssl.sh @@ -136,5 +136,5 @@ function set_conf_ssl { } function rollback_openssl_lib() { - rm $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar + rm -f $FLINK_DIR/lib/flink-shaded-netty-tcnative-{dynamic,static}-*.jar }
(flink) 03/04: [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 91e6e7eb913043371b032b00012c5d87b87fa3bc Author: Matthias Pohl AuthorDate: Wed Feb 7 11:46:45 2024 +0100 [FLINK-34508][ci] Makes FileSink e2e tests run with Minio instead of S3 --- flink-end-to-end-tests/test-scripts/test_file_sink.sh | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 7b7728b44f8..204c6442b7e 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -62,8 +62,7 @@ function get_total_number_of_valid_lines { if [ "${OUT_TYPE}" == "local" ]; then echo "[INFO] Test run in local environment: No S3 environment is loaded." elif [ "${OUT_TYPE}" == "s3" ]; then - # the s3 context requires additional - source "$(dirname "$0")"/common_s3.sh + source "$(dirname "$0")"/common_s3_minio.sh s3_setup hadoop # overwrites JOB_OUTPUT_PATH to point to S3 @@ -90,7 +89,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then function out_cleanup { s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" -rollback_openssl_lib } on_exit out_cleanup @@ -104,6 +102,9 @@ OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" +# set_conf_ssl moves netty libraries into FLINK_DIR which we want to rollback at the end of the test run +on_exit rollback_openssl_lib + set_config_key "metrics.fetcher.update-interval" "2000" # this test relies on global failovers set_config_key "jobmanager.execution.failover-strategy" "full"
(flink) 01/04: [hotfix][ci] Docker might return multiple port bindings
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b2cdb4aac98b1858ba63596e6c6a3bddf3f274da Author: Matthias Pohl AuthorDate: Wed Feb 7 11:42:32 2024 +0100 [hotfix][ci] Docker might return multiple port bindings Adding the grep will work around this issue. --- flink-end-to-end-tests/test-scripts/common_s3_minio.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh index a56ce4e9410..d3d08392caa 100644 --- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh +++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh @@ -54,7 +54,7 @@ function s3_start { while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do sleep 0.1 done - export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed s'/0\.0\.0\.0/localhost/')" + export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')" echo "Started minio @ $S3_ENDPOINT" on_exit s3_stop } @@ -115,4 +115,4 @@ function s3_setup_with_provider { set_config_key "s3.path-style-access" "true" } -source "$(dirname "$0")"/common_s3_operations.sh \ No newline at end of file +source "$(dirname "$0")"/common_s3_operations.sh
(flink) branch release-1.18 updated: [FLINK-35000][build] Updates link to test code convention in pull request template
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new 9150f93b18b [FLINK-35000][build] Updates link to test code convention in pull request template 9150f93b18b is described below commit 9150f93b18b8694646092a6ed24a14e3653f613f Author: Matthias Pohl AuthorDate: Wed Apr 3 14:08:27 2024 +0200 [FLINK-35000][build] Updates link to test code convention in pull request template --- .github/PULL_REQUEST_TEMPLATE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 8b0cbf71ea8..2427fd4f01f 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -39,7 +39,7 @@ ## Verifying this change -Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing +Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)*
(flink) branch release-1.19 updated: [FLINK-35000][build] Updates link to test code convention in pull request template
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new eb58599b434 [FLINK-35000][build] Updates link to test code convention in pull request template eb58599b434 is described below commit eb58599b434b6c5fe86f6e487ce88315c98b4ec3 Author: Matthias Pohl AuthorDate: Wed Apr 3 14:08:27 2024 +0200 [FLINK-35000][build] Updates link to test code convention in pull request template --- .github/PULL_REQUEST_TEMPLATE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 8b0cbf71ea8..2427fd4f01f 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -39,7 +39,7 @@ ## Verifying this change -Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing +Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)*
(flink) branch master updated: [FLINK-35000][build] Updates link to test code convention in pull request template
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d301839dfe2 [FLINK-35000][build] Updates link to test code convention in pull request template d301839dfe2 is described below commit d301839dfe2ed9b1313d23f8307bda76868a0c0a Author: Matthias Pohl AuthorDate: Wed Apr 3 14:08:27 2024 +0200 [FLINK-35000][build] Updates link to test code convention in pull request template --- .github/PULL_REQUEST_TEMPLATE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 8b0cbf71ea8..2427fd4f01f 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -39,7 +39,7 @@ ## Verifying this change -Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing +Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)*
(flink) branch release-1.19 updated: [FLINK-33816][streaming] Fix unstable test SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new ece4faee055 [FLINK-33816][streaming] Fix unstable test SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain ece4faee055 is described below commit ece4faee055b3797b39e9c0b55f3e94a3db2f912 Author: Jiabao Sun AuthorDate: Tue Jan 2 14:33:17 2024 +0800 [FLINK-33816][streaming] Fix unstable test SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain --- .../org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 29bcf6b5990..5e47d36040b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -708,7 +708,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase { harness.streamTask.runMailboxLoop(); harness.finishProcessing(); -assertThat(triggerResult.isDone()).isTrue(); assertThat(triggerResult.get()).isTrue(); assertThat(checkpointCompleted.isDone()).isTrue(); }
(flink) branch release-1.19 updated: [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new c11656a2406 [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored c11656a2406 is described below commit c11656a2406f07e2ae7cd6f80c46afb14385ee0e Author: Matthias Pohl AuthorDate: Mon Mar 25 11:33:13 2024 +0100 [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored --- .../JobMasterServiceLeadershipRunnerTest.java | 24 +- .../jobmaster/TestingJobMasterServiceProcess.java | 4 +++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index a088d3f32c1..ee5cbcecc1a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -499,11 +499,33 @@ class JobMasterServiceLeadershipRunnerTest { leaderElection.notLeader(); +assertThat(jobManagerRunner.getResultFuture()) +.as("The runner result should not be completed by the leadership revocation.") +.isNotDone(); + resultFuture.complete( JobManagerRunnerResult.forSuccess( createFailedExecutionGraphInfo(new FlinkException("test exception"; -assertThatFuture(jobManagerRunner.getResultFuture()).eventuallyFails(); +assertThat(jobManagerRunner.getResultFuture()) +.as("The runner result should not be completed if the leadership is lost.") +.isNotDone(); + +jobManagerRunner.closeAsync().get(); + +assertThatFuture(jobManagerRunner.getResultFuture()) +.eventuallySucceeds() +.as( +"The runner result should be completed with a SUSPENDED job status if the job didn't finish when closing the runner, yet.") +.satisfies( +result -> { +assertThat(result.isSuccess()).isTrue(); +assertThat( +result.getExecutionGraphInfo() + .getArchivedExecutionGraph() +.getState()) +.isEqualTo(JobStatus.SUSPENDED); +}); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java index c075d88fa09..8932bc8dd22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.util.concurrent.FutureUtils; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -87,7 +88,8 @@ public class TestingJobMasterServiceProcess implements JobMasterServiceProcess { public static final class Builder { private UUID leaderSessionId = UUID.randomUUID(); -private Supplier> closeAsyncSupplier = unsupportedOperation(); +private Supplier> closeAsyncSupplier = +FutureUtils::completedVoidFuture; private Supplier isInitializedAndRunningSupplier = unsupportedOperation(); private Supplier> getJobMasterGatewayFutureSupplier = () ->
(flink) branch release-1.18 updated (a6aa569f500 -> 94d1363c27e)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git from a6aa569f500 [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. add 94d1363c27e [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored No new revisions were added by this update. Summary of changes: .../JobMasterServiceLeadershipRunnerTest.java | 24 +- .../jobmaster/TestingJobMasterServiceProcess.java | 4 +++- 2 files changed, 26 insertions(+), 2 deletions(-)
(flink) branch master updated: [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1668a072769 [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored 1668a072769 is described below commit 1668a07276929416469392a35a77ba7699aac30b Author: Matthias Pohl AuthorDate: Mon Mar 25 11:33:13 2024 +0100 [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored --- .../JobMasterServiceLeadershipRunnerTest.java | 24 +- .../jobmaster/TestingJobMasterServiceProcess.java | 4 +++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index a088d3f32c1..ee5cbcecc1a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -499,11 +499,33 @@ class JobMasterServiceLeadershipRunnerTest { leaderElection.notLeader(); +assertThat(jobManagerRunner.getResultFuture()) +.as("The runner result should not be completed by the leadership revocation.") +.isNotDone(); + resultFuture.complete( JobManagerRunnerResult.forSuccess( createFailedExecutionGraphInfo(new FlinkException("test exception"; -assertThatFuture(jobManagerRunner.getResultFuture()).eventuallyFails(); +assertThat(jobManagerRunner.getResultFuture()) +.as("The runner result should not be completed if the leadership is lost.") +.isNotDone(); + +jobManagerRunner.closeAsync().get(); + +assertThatFuture(jobManagerRunner.getResultFuture()) +.eventuallySucceeds() +.as( +"The runner result should be completed with a SUSPENDED job status if the job didn't finish when closing the runner, yet.") +.satisfies( +result -> { +assertThat(result.isSuccess()).isTrue(); +assertThat( +result.getExecutionGraphInfo() + .getArchivedExecutionGraph() +.getState()) +.isEqualTo(JobStatus.SUSPENDED); +}); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java index c075d88fa09..8932bc8dd22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.util.concurrent.FutureUtils; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -87,7 +88,8 @@ public class TestingJobMasterServiceProcess implements JobMasterServiceProcess { public static final class Builder { private UUID leaderSessionId = UUID.randomUUID(); -private Supplier> closeAsyncSupplier = unsupportedOperation(); +private Supplier> closeAsyncSupplier = +FutureUtils::completedVoidFuture; private Supplier isInitializedAndRunningSupplier = unsupportedOperation(); private Supplier> getJobMasterGatewayFutureSupplier = () ->
(flink) branch master updated (f31c128bfc4 -> 83f82ab0c86)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from f31c128bfc4 [FLINK-34956][doc] Fix the config type wrong of Duration add 83f82ab0c86 [FLINK-33376][coordination] Extend ZooKeeper Curator configurations No new revisions were added by this update. Summary of changes: .../generated/high_availability_configuration.html | 18 .../configuration/HighAvailabilityOptions.java | 49 ++ .../apache/flink/runtime/util/ZooKeeperUtils.java | 40 ++ 3 files changed, 107 insertions(+)
(flink-docker) branch dev-1.18 updated (9a5b2d9 -> 6ec55b2)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch dev-1.18 in repository https://gitbox.apache.org/repos/asf/flink-docker.git from 9a5b2d9 [FLINK-34165] Update apache download url new d93d911 [FLINK-34419][docker] Adds JDK17 support new 69d1638 [hotfix][ci] Utilizes matrix strategy in GHA workflow new 6ec55b2 [hotfix][ci] Updates actions The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/ci.yml | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-)
(flink-docker) 03/03: [hotfix][ci] Updates actions
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-1.18 in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 6ec55b25d815f08953b6cb866ba1a77974b3cfd4 Author: morazow AuthorDate: Thu Feb 29 12:34:14 2024 +0100 [hotfix][ci] Updates actions --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 66d60a2..baec748 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: matrix: java_version: [ 8, 11, 17 ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: "Build images" run: | ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n "test-java${{ matrix.java_version }}"
(flink-docker) 01/03: [FLINK-34419][docker] Adds JDK17 support
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-1.18 in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit d93d911b015e535fc2b6f1426c3b36229ff3d02a Author: morazow AuthorDate: Thu Feb 29 12:32:00 2024 +0100 [FLINK-34419][docker] Adds JDK17 support --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f1bbb24..083b6f2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,5 +26,6 @@ jobs: run: | ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; -j 8 -n test-java8 ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; -j 11 -n test-java11 + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; -j 17 -n test-java17 - name: "Test images" run: testing/run_tests.sh
(flink-docker) 02/03: [hotfix][ci] Utilizes matrix strategy in GHA workflow
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-1.18 in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 69d16387bb9eb573a8f2750c1509f7b5e7cdce17 Author: morazow AuthorDate: Thu Feb 29 12:33:51 2024 +0100 [hotfix][ci] Utilizes matrix strategy in GHA workflow --- .github/workflows/ci.yml | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 083b6f2..66d60a2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,15 +17,21 @@ name: "CI" on: [push, pull_request] +env: + FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + matrix: +java_version: [ 8, 11, 17 ] steps: - uses: actions/checkout@v3 - name: "Build images" run: | - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; -j 8 -n test-java8 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; -j 11 -n test-java11 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.18-SNAPSHOT-bin-scala_2.12.tgz"; -j 17 -n test-java17 + ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n "test-java${{ matrix.java_version }}" - name: "Test images" run: testing/run_tests.sh
(flink-docker) 03/03: [hotfix][ci] Updates actions
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-1.19 in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit e96fc75c468135ca5ba31d7b2ced7796a18b44c1 Author: morazow AuthorDate: Thu Feb 29 12:28:15 2024 +0100 [hotfix][ci] Updates actions --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d789a8d..0001d45 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: matrix: java_version: [ 8, 11, 17, 21 ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: "Build images" run: | ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n "test-java${{ matrix.java_version }}"
(flink-docker) 02/03: [hotfix][ci] Utilizes matrix strategy in GHA workflow
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-1.19 in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 53909c0b193de539ab96c1798c0ad39b3ed4fc31 Author: morazow AuthorDate: Thu Feb 29 12:27:56 2024 +0100 [hotfix][ci] Utilizes matrix strategy in GHA workflow --- .github/workflows/ci.yml | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 40dc094..d789a8d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,16 +17,21 @@ name: "CI" on: [push, pull_request] +env: + FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + matrix: +java_version: [ 8, 11, 17, 21 ] steps: - uses: actions/checkout@v3 - name: "Build images" run: | - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 8 -n test-java8 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 11 -n test-java11 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 17 -n test-java17 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 21 -n test-java21 + ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n "test-java${{ matrix.java_version }}" - name: "Test images" run: testing/run_tests.sh
(flink-docker) 01/03: [FLINK-34419][docker] Adds JDK17 and JDK21 support
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-1.19 in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 67d7c46ed382a665e941f0cf1f1606d10f87dee5 Author: morazow AuthorDate: Thu Feb 29 12:25:00 2024 +0100 [FLINK-34419][docker] Adds JDK17 and JDK21 support --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a486206..40dc094 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,5 +26,7 @@ jobs: run: | ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 8 -n test-java8 ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 11 -n test-java11 + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 17 -n test-java17 + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.19-SNAPSHOT-bin-scala_2.12.tgz"; -j 21 -n test-java21 - name: "Test images" run: testing/run_tests.sh
(flink-docker) branch dev-1.19 updated (7549f2d -> e96fc75)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch dev-1.19 in repository https://gitbox.apache.org/repos/asf/flink-docker.git from 7549f2d Add GPG key for 1.19.0 release new 67d7c46 [FLINK-34419][docker] Adds JDK17 and JDK21 support new 53909c0 [hotfix][ci] Utilizes matrix strategy in GHA workflow new e96fc75 [hotfix][ci] Updates actions The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/ci.yml | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-)
(flink-docker) 03/03: [hotfix][ci] Updates actions
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit c350c6167033518011abd62b6791fc1b1e8efe2e Author: morazow AuthorDate: Thu Feb 29 12:18:49 2024 +0100 [hotfix][ci] Updates actions --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba75faa..3af25de 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: matrix: java_version: [ 8, 11, 17, 21 ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: "Build images" run: | ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n "test-java${{ matrix.java_version }}"
(flink-docker) branch dev-master updated (dd78da0 -> c350c61)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch dev-master in repository https://gitbox.apache.org/repos/asf/flink-docker.git from dd78da0 [hotfix] Improve docker-entrypoint.sh to Use Arrays for Configuration Parameters Instead of Eval and String Concatenation new 1460077 [FLINK-34419][docker] Adds JDK17 and JDK21 support new 4b77c1a [hotfix][ci] Utilizes matrix strategy in GHA workflow new c350c61 [hotfix][ci] Updates actions The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/ci.yml | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-)
(flink-docker) 01/03: [FLINK-34419][docker] Adds JDK17 and JDK21 support
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 1460077743b29e17edd0a2d7efd3897fa097988d Author: morazow AuthorDate: Thu Feb 29 12:08:30 2024 +0100 [FLINK-34419][docker] Adds JDK17 and JDK21 support --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6e45a73..0c6c6d6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,5 +26,7 @@ jobs: run: | ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 8 -n test-java8 ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 11 -n test-java11 + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 17 -n test-java17 + ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 21 -n test-java21 - name: "Test images" run: testing/run_tests.sh
(flink-docker) 02/03: [hotfix][ci] Utilizes matrix strategy in GHA workflow
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch dev-master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 4b77c1a9bb2ffd362cafb79cc775db41e6cd45c6 Author: morazow AuthorDate: Thu Feb 29 12:17:46 2024 +0100 [hotfix][ci] Utilizes matrix strategy in GHA workflow --- .github/workflows/ci.yml | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0c6c6d6..ba75faa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,16 +17,21 @@ name: "CI" on: [push, pull_request] +env: + FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; + jobs: ci: +name: CI using JDK ${{ matrix.java_version }} runs-on: ubuntu-latest +strategy: + fail-fast: false + matrix: +java_version: [ 8, 11, 17, 21 ] steps: - uses: actions/checkout@v3 - name: "Build images" run: | - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 8 -n test-java8 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 11 -n test-java11 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 17 -n test-java17 - ./add-custom.sh -u "https://s3.amazonaws.com/flink-nightly/flink-1.20-SNAPSHOT-bin-scala_2.12.tgz"; -j 21 -n test-java21 + ./add-custom.sh -u "$FLINK_TAR_URL" -j ${{ matrix.java_version }} -n "test-java${{ matrix.java_version }}" - name: "Test images" run: testing/run_tests.sh
(flink-docker) 01/02: [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ versions
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 9e0041a2c9dace4bf3f32815e3e24e24385b179b Author: morazow AuthorDate: Thu Feb 29 09:30:14 2024 +0100 [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ versions --- .github/workflows/snapshot.yml | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/snapshot.yml b/.github/workflows/snapshot.yml index 2389cd1..a262ecc 100644 --- a/.github/workflows/snapshot.yml +++ b/.github/workflows/snapshot.yml @@ -36,15 +36,18 @@ jobs: strategy: max-parallel: 1 matrix: -java_version: [8, 11] build: - flink_version: 1.20-SNAPSHOT +java_version: [8, 11, 17, 21] branch: dev-master - flink_version: 1.19-SNAPSHOT +java_version: [8, 11, 17, 21] branch: dev-1.19 - flink_version: 1.18-SNAPSHOT +java_version: [8, 11, 17] branch: dev-1.18 - flink_version: 1.17-SNAPSHOT +java_version: [8, 11] branch: dev-1.17 steps: - uses: actions/checkout@v3 @@ -63,7 +66,7 @@ jobs: - name: Log in to the Container registry uses: docker/login-action@v1 with: - registry: ghcr.io + registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }}
(flink-docker) 02/02: [FLINK-34314][docker] Updated GitHub actions versions
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git commit 54c53458ad99bfb21acca66d5c6e91b5812c26ce Author: morazow AuthorDate: Thu Feb 29 09:32:16 2024 +0100 [FLINK-34314][docker] Updated GitHub actions versions --- .github/workflows/snapshot.yml | 13 + 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/.github/workflows/snapshot.yml b/.github/workflows/snapshot.yml index a262ecc..063efbf 100644 --- a/.github/workflows/snapshot.yml +++ b/.github/workflows/snapshot.yml @@ -50,21 +50,18 @@ jobs: java_version: [8, 11] branch: dev-1.17 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: ref: ${{ matrix.build.branch }} - name: Set up QEMU -uses: docker/setup-qemu-action@v1 -with: - image: tonistiigi/binfmt:latest - platforms: all +uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx -uses: docker/setup-buildx-action@v1 +uses: docker/setup-buildx-action@v3 - name: Log in to the Container registry -uses: docker/login-action@v1 +uses: docker/login-action@v3 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} @@ -82,7 +79,7 @@ jobs: run: env - name: Build and push Docker images (supported platforms) -uses: docker/bake-action@v1.7.0 +uses: docker/bake-action@v4 with: files: | .github/workflows/docker-bake.hcl
(flink-docker) branch master updated (20017e8 -> 54c5345)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-docker.git from 20017e8 [FLINK-34701][release] Update docker-entrypoint.sh for 1.19.0 new 9e0041a [FLINK-34419][docker] Added support of JDK 17 & 21 for Flink 1.18+ versions new 54c5345 [FLINK-34314][docker] Updated GitHub actions versions The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/snapshot.yml | 20 ++-- 1 file changed, 10 insertions(+), 10 deletions(-)
(flink) branch release-1.18 updated: [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new a6aa569f500 [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. a6aa569f500 is described below commit a6aa569f5005041934a2e6398b6749584beeaabd Author: Matthias Pohl AuthorDate: Wed Mar 20 16:06:58 2024 +0100 [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. --- .../JobMasterServiceLeadershipRunnerTest.java | 27 ++ 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index dc78a9e9ed9..3688f6129fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -45,18 +45,19 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; import org.apache.flink.util.function.ThrowingRunnable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; @@ -676,26 +677,25 @@ class JobMasterServiceLeadershipRunnerTest { } } -@Disabled @Test void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip() throws Exception { final AtomicReference storedLeaderInformation = new AtomicReference<>(LeaderInformationRegister.empty()); +final AtomicBoolean haBackendLeadershipFlag = new AtomicBoolean(); final TestingLeaderElectionDriver.Factory driverFactory = new TestingLeaderElectionDriver.Factory( TestingLeaderElectionDriver.newBuilder( -new AtomicBoolean(), storedLeaderInformation, new AtomicBoolean())); +haBackendLeadershipFlag, +storedLeaderInformation, +new AtomicBoolean())); // we need to use DefaultLeaderElectionService here because JobMasterServiceLeadershipRunner // in connection with the DefaultLeaderElectionService generates the nested locking final DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(driverFactory, fatalErrorHandler); -final TestingLeaderElectionDriver currentLeaderDriver = -driverFactory.assertAndGetOnlyCreatedDriver(); - // latch to detect when we reached the first synchronized section having a lock on the // JobMasterServiceProcess#stop side final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch(); @@ -733,6 +733,7 @@ class JobMasterServiceLeadershipRunnerTest { // before calling stop on the // DefaultLeaderElectionService triggerClassLoaderLeaseRelease.await(); + // In order to reproduce the deadlock, we // need to ensure that // leaderContender#grantLeadership can be @@ -770,13 +771,19 @@ class JobMasterServiceLeadershipRunnerTest { jobManagerRunner.start(); // grant leadership to create jobMasterServiceProcess +haBackendLeadershipFlag.set(true); final UUID leaderSessionID = UUID.randomUUID(); defaultLeaderElectionService.onGrantLeadership(leaderSessionID); -
(flink) branch release-1.19 updated: [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new 6b5c48ff53d [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. 6b5c48ff53d is described below commit 6b5c48ff53ddc6e75056a9050afded2ac44a413a Author: Matthias Pohl AuthorDate: Wed Mar 20 16:06:58 2024 +0100 [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. --- .../JobMasterServiceLeadershipRunnerTest.java | 27 ++ 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index 5a6f82a75d5..a088d3f32c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -45,18 +45,19 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; import org.apache.flink.util.function.ThrowingRunnable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; @@ -676,26 +677,25 @@ class JobMasterServiceLeadershipRunnerTest { } } -@Disabled @Test void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip() throws Exception { final AtomicReference storedLeaderInformation = new AtomicReference<>(LeaderInformationRegister.empty()); +final AtomicBoolean haBackendLeadershipFlag = new AtomicBoolean(); final TestingLeaderElectionDriver.Factory driverFactory = new TestingLeaderElectionDriver.Factory( TestingLeaderElectionDriver.newBuilder( -new AtomicBoolean(), storedLeaderInformation, new AtomicBoolean())); +haBackendLeadershipFlag, +storedLeaderInformation, +new AtomicBoolean())); // we need to use DefaultLeaderElectionService here because JobMasterServiceLeadershipRunner // in connection with the DefaultLeaderElectionService generates the nested locking final DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(driverFactory, fatalErrorHandler); -final TestingLeaderElectionDriver currentLeaderDriver = -driverFactory.assertAndGetOnlyCreatedDriver(); - // latch to detect when we reached the first synchronized section having a lock on the // JobMasterServiceProcess#stop side final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch(); @@ -733,6 +733,7 @@ class JobMasterServiceLeadershipRunnerTest { // before calling stop on the // DefaultLeaderElectionService triggerClassLoaderLeaseRelease.await(); + // In order to reproduce the deadlock, we // need to ensure that // leaderContender#grantLeadership can be @@ -770,13 +771,19 @@ class JobMasterServiceLeadershipRunnerTest { jobManagerRunner.start(); // grant leadership to create jobMasterServiceProcess +haBackendLeadershipFlag.set(true); final UUID leaderSessionID = UUID.randomUUID(); defaultLeaderElectionService.onGrantLeadership(leaderSessionID); -
(flink) branch master updated (d4c1a0a1ba4 -> 0e70d89ad9f)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from d4c1a0a1ba4 [FLINK-34643] Use AdaptiveScheduler in JobIDLoggingITCase add 0e70d89ad9f [FLINK-34897][test] Enables JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip again. No new revisions were added by this update. Summary of changes: .../JobMasterServiceLeadershipRunnerTest.java | 27 ++ 1 file changed, 17 insertions(+), 10 deletions(-)
(flink) 04/04: [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit f2a6ff5a97bf27d68be1188c05158e18df810549 Author: Matthias Pohl AuthorDate: Wed Feb 7 16:34:19 2024 +0100 [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler --- flink-end-to-end-tests/run-nightly-tests.sh | 46 ++--- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index f6d45598088..e6d088133ea 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -125,30 +125,28 @@ function run_group_1 { # Docker / Container / Kubernetes tests -if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then -run_test "Wordcount on Docker test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh dummy-fs" - -run_test "Run Kubernetes test" "$END_TO_END_DIR/test-scripts/test_kubernetes_embedded_job.sh" -run_test "Run kubernetes session test (default input)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh" -run_test "Run kubernetes session test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh dummy-fs" -run_test "Run kubernetes application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh" -run_test "Run kubernetes application HA test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application_ha.sh" -run_test "Run Kubernetes IT test" "$END_TO_END_DIR/test-scripts/test_kubernetes_itcases.sh" - -run_test "Running Flink over NAT end-to-end test" "$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions" - -if [[ `uname -i` != 'aarch64' ]]; then -# Skip PyFlink e2e test, because MiniConda and Pyarrow which Pyflink depends doesn't support aarch64 currently. -run_test "Run kubernetes pyflink application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_pyflink_application.sh" - -# Hadoop YARN deosn't support aarch64 at this moment. See: https://issues.apache.org/jira/browse/HADOOP-16723 -# These tests are known to fail on JDK11. See FLINK-13719 -if [[ ${PROFILE} != *"jdk11"* ]]; then -run_test "Running Kerberized YARN per-job on Docker test (default input)" "$END_TO_END_DIR/test-scripts/test_yarn_job_kerberos_docker.sh" -run_test "Running Kerberized YARN per-job on Docker test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_yarn_job_kerberos_docker.sh dummy-fs" -run_test "Running Kerberized YARN application on Docker test (default input)" "$END_TO_END_DIR/test-scripts/test_yarn_application_kerberos_docker.sh" -run_test "Running Kerberized YARN application on Docker test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_yarn_application_kerberos_docker.sh dummy-fs" -fi +run_test "Wordcount on Docker test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh dummy-fs" + +run_test "Run Kubernetes test" "$END_TO_END_DIR/test-scripts/test_kubernetes_embedded_job.sh" +run_test "Run kubernetes session test (default input)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh" +run_test "Run kubernetes session test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh dummy-fs" +run_test "Run kubernetes application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh" +run_test "Run kubernetes application HA test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application_ha.sh" +run_test "Run Kubernetes IT test" "$END_TO_END_DIR/test-scripts/test_kubernetes_itcases.sh" + +run_test "Running Flink over NAT end-to-end test" "$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions" + +if [[ `uname -i` != 'aarch64' ]]; then +# Skip PyFlink e2e test, because MiniConda and Pyarrow which Pyflink depends doesn't support aarch64 currently. +run_test "Run kubernetes pyflink application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_pyflink_application.sh" + +# Hadoop YARN doesn't
(flink) branch release-1.18 updated (940b3bbda5b -> f2a6ff5a97b)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git from 940b3bbda5b [FLINK-32513][core] Add predecessor caching new 8f6890fbd75 [FLINK-21400][ci] Enables FileSink and Stateful stream job e2e test for the AdaptiveScheduler new f5c243097ac [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler new 836b332b2d1 [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler new f2a6ff5a97b [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../file/src/FileSourceTextLinesITCase.java| 2 - flink-end-to-end-tests/run-nightly-tests.sh| 60 ++ .../flink/runtime/jobmaster/JobMasterTest.java | 4 +- .../checkpointing/UnalignedCheckpointITCase.java | 3 -- .../checkpointing/UnalignedCheckpointTestBase.java | 12 - .../DefaultSchedulerLocalRecoveryITCase.java | 9 +++- 6 files changed, 48 insertions(+), 42 deletions(-)
(flink) 03/04: [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit 836b332b2d100e21b1d0008257a009d9ec09e13a Author: Matthias Pohl AuthorDate: Wed Feb 7 15:59:11 2024 +0100 [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler --- .../flink/test/checkpointing/UnalignedCheckpointITCase.java | 3 --- .../test/checkpointing/UnalignedCheckpointTestBase.java | 12 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index c290ba0e39f..1f587dbf767 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -38,12 +38,10 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; import org.apache.flink.util.Collector; import org.apache.commons.lang3.ArrayUtils; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -104,7 +102,6 @@ import static org.hamcrest.Matchers.equalTo; * */ @RunWith(Parameterized.class) -@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { enum Topology implements DagCreator { PIPELINE { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index a3da7f7733e..319df472ff8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -102,8 +102,16 @@ import static org.apache.flink.shaded.guava31.com.google.common.collect.Iterable import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT; import static org.apache.flink.util.Preconditions.checkState; -/** Base class for tests related to unaligned checkpoints. */ -@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689 +/** + * Base class for tests related to unaligned checkpoints. + * + * This test base relies on restarting the subtasks within the scheduler to trigger a reset of + * the operators. The operator reset is counted in the LongSource. The job will terminate if the + * number of expected restarts is reached. The AdaptiveScheduler won't trigger the operator reset + * resulting in the test running forever. This is why this test suite is disabled for the {@link + * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}. + */ +@Category(FailsWithAdaptiveScheduler.class) public abstract class UnalignedCheckpointTestBase extends TestLogger { protected static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointTestBase.class); protected static final String NUM_INPUTS = "inputs_";
(flink) 02/04: [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 00492630baa5cf041ea2cce2a3560f3e713bf57a Author: Matthias Pohl AuthorDate: Wed Feb 7 15:47:39 2024 +0100 [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler For a few tests, a proper explanation is added why the tests are still disabled --- .../flink/connector/file/src/FileSourceTextLinesITCase.java | 2 -- flink-end-to-end-tests/run-nightly-tests.sh | 2 +- .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 4 +++- .../flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java | 9 +++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java index 08d53f21426..f0ff3fb5cb2 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java @@ -42,7 +42,6 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -198,7 +197,6 @@ class FileSourceTextLinesITCase { * record format (text lines) and restarts TaskManager. */ @Test -@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450 void testContinuousTextFileSourceWithTaskManagerFailover(@TempDir java.nio.file.Path tmpTestDir) throws Exception { // This test will kill TM, so we run it in a new cluster to avoid affecting other tests diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index b5000eb5d38..72146e38ecd 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -233,7 +233,7 @@ function run_group_2 { # Sticky Scheduling -if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450 +if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-34416 run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap false false 100" "skip_check_exceptions" run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap false true 100" "skip_check_exceptions" run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false 100" "skip_check_exceptions" diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 5a84fb4f0e4..e8e63652283 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1019,7 +1019,9 @@ class JobMasterTest { * if this execution fails. */ @Test -@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450 +// The AdaptiveScheduler doesn't support partial recovery but restarts all Executions in case of +// a local failure. +@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") void testRequestNextInputSplitWithLocalFailover() throws Exception { configuration.set( diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java index 44db8abdec6..583e4341e65 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java @@ -60,13 +60,18 @@ public class DefaultSchedulerLocalRecoveryITCase extends TestLogger { private static final long TIMEOUT = 10_000L; @Test -
(flink) 03/04: [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7d107966dbe7e38e43680fabf3ffdfeaa71e8d3c Author: Matthias Pohl AuthorDate: Wed Feb 7 15:59:11 2024 +0100 [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler --- .../flink/test/checkpointing/UnalignedCheckpointITCase.java | 3 --- .../test/checkpointing/UnalignedCheckpointTestBase.java | 12 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index effbbc52fc8..f649b8f7df3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -38,12 +38,10 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; import org.apache.flink.util.Collector; import org.apache.commons.lang3.ArrayUtils; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -105,7 +103,6 @@ import static org.hamcrest.Matchers.equalTo; * */ @RunWith(Parameterized.class) -@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689 public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { enum Topology implements DagCreator { PIPELINE { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index ea664da1772..e88f27fe1ce 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -104,8 +104,16 @@ import static org.apache.flink.shaded.guava31.com.google.common.collect.Iterable import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT; import static org.apache.flink.util.Preconditions.checkState; -/** Base class for tests related to unaligned checkpoints. */ -@Category(FailsWithAdaptiveScheduler.class) // FLINK-21689 +/** + * Base class for tests related to unaligned checkpoints. + * + * This test base relies on restarting the subtasks within the scheduler to trigger a reset of + * the operators. The operator reset is counted in the LongSource. The job will terminate if the + * number of expected restarts is reached. The AdaptiveScheduler won't trigger the operator reset + * resulting in the test running forever. This is why this test suite is disabled for the {@link + * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}. + */ +@Category(FailsWithAdaptiveScheduler.class) public abstract class UnalignedCheckpointTestBase extends TestLogger { protected static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointTestBase.class); protected static final String NUM_INPUTS = "inputs_";
(flink) branch release-1.19 updated (5ec4bf2f181 -> f82ff7c656d)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git from 5ec4bf2f181 [FLINK-32513][core] Add predecessor caching new 4fc36e9abaa [FLINK-21400][ci] Enables FileSink and Stateful stream job e2e test for the AdaptiveScheduler new 00492630baa [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler new 7d107966dbe [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler new f82ff7c656d [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../file/src/FileSourceTextLinesITCase.java| 2 - flink-end-to-end-tests/run-nightly-tests.sh| 60 ++ .../flink/runtime/jobmaster/JobMasterTest.java | 4 +- .../checkpointing/UnalignedCheckpointITCase.java | 3 -- .../checkpointing/UnalignedCheckpointTestBase.java | 12 - .../DefaultSchedulerLocalRecoveryITCase.java | 9 +++- 6 files changed, 48 insertions(+), 42 deletions(-)
(flink) branch master updated (3a55a3ff750 -> 1aa35b95975)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 3a55a3ff750 [FLINK-34902][table] Fix column mismatch IndexOutOfBoundsException exception add a1d17ccf0ee [FLINK-21400][ci] Enables FileSink and Stateful stream job e2e test for the AdaptiveScheduler add 8f06fb472ba [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler add 96142404c14 [FLINK-21535][test] Adds proper comment to test methods that are disabled for the AdaptiveScheduler add 1aa35b95975 [FLINK-34409][ci] Enable any still disabled e2e tests for the AdaptiveScheduler No new revisions were added by this update. Summary of changes: .../file/src/FileSourceTextLinesITCase.java| 2 - flink-end-to-end-tests/run-nightly-tests.sh| 60 ++ .../flink/runtime/jobmaster/JobMasterTest.java | 4 +- .../checkpointing/UnalignedCheckpointITCase.java | 3 -- .../checkpointing/UnalignedCheckpointTestBase.java | 12 - .../DefaultSchedulerLocalRecoveryITCase.java | 9 +++- 6 files changed, 48 insertions(+), 42 deletions(-)
(flink) 02/04: [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git commit f5c243097ac9fae29c3365a2361b7b0c6be3b3ee Author: Matthias Pohl AuthorDate: Wed Feb 7 15:47:39 2024 +0100 [FLINK-21450][test] Enables tests that were disabled for the AdaptiveScheduler For a few tests, a proper explanation is added why the tests are still disabled --- .../flink/connector/file/src/FileSourceTextLinesITCase.java | 2 -- flink-end-to-end-tests/run-nightly-tests.sh | 2 +- .../java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 4 +++- .../flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java | 9 +++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java index 01cbd8aa9c2..f1d65888fd8 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java @@ -37,7 +37,6 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -167,7 +166,6 @@ class FileSourceTextLinesITCase { * record format (text lines) and restarts TaskManager. */ @Test -@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450 void testContinuousTextFileSourceWithTaskManagerFailover(@TempDir java.nio.file.Path tmpTestDir) throws Exception { // This test will kill TM, so we run it in a new cluster to avoid affecting other tests diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 8389442c29f..f6d45598088 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -231,7 +231,7 @@ function run_group_2 { # Sticky Scheduling -if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then #FLINK-21450 +if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-34416 run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap false false 100" "skip_check_exceptions" run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 hashmap false true 100" "skip_check_exceptions" run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 10 rocks false false 100" "skip_check_exceptions" diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 64d462a9090..bc15bcdacbc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1018,7 +1018,9 @@ class JobMasterTest { * if this execution fails. */ @Test -@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450 +// The AdaptiveScheduler doesn't support partial recovery but restarts all Executions in case of +// a local failure. +@Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") void testRequestNextInputSplitWithLocalFailover() throws Exception { configuration.setString( diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java index 93b55c83081..99a6e3b19fb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java @@ -60,13 +60,18 @@ public class DefaultSchedulerLocalRecoveryITCase extends TestLogger { private static final long TIMEOUT = 10_000L; @Test -