[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
gyfora commented on pull request #21: URL: https://github.com/apache/flink-kubernetes-operator/pull/21#issuecomment-1051721189 There are still some outstanding minor comments but after that I think its good to go +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
gyfora commented on pull request #21: URL: https://github.com/apache/flink-kubernetes-operator/pull/21#issuecomment-1051719280 Sounds good @tweise ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request #25: [FLINK-26356] Use the common method to construct service name
Aitozi opened a new pull request #25: URL: https://github.com/apache/flink-kubernetes-operator/pull/25 Use the common method to construct service name. Besides, the `RestOptions.ADDRESS` is always generated at the server side we can not get it from the client config. Since the flink-operator is always deployed with the flink job in the same cluster, we can directly use the `Servicename.Namespace` to talk to Flink job rest server. I'am not sure whether need to remove the `config.getString(RestOptions.ADDRESS, xx)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] tweise commented on pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
tweise commented on pull request #21: URL: https://github.com/apache/flink-kubernetes-operator/pull/21#issuecomment-1051663403 @gyfora this is working pretty well now, I also added a port readiness check. But the current implementation with control logic scattered over job status observer and reconcilers makes it pretty hard to implement this in a cleaner way. I'm working on some refactoring and hopefully that will lead to simplification. Hope to have it done soon to avoid larger merge conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink-statefun] branch master updated (725202c -> 788417d)
This is an automated email from the ASF dual-hosted git repository. igal pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git. from 725202c [FLINK-26340][statefun-golang-sdk] Add ability in Golang SDK to create new statefun.Context from existing one, but with a new underlying context.Context add 788417d [FLINK-26375][statefun-golang-sdk] Fix Statefun Golang SDK to return nil from Context.Caller when there is no caller No new revisions were added by this update. Summary of changes: statefun-sdk-go/v3/pkg/statefun/handler.go | 5 +- statefun-sdk-go/v3/pkg/statefun/handler_test.go | 89 + 2 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 statefun-sdk-go/v3/pkg/statefun/handler_test.go
[flink] branch master updated (66af14b -> 5c6a220)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 66af14b [FLINK-26287][table] Update some test classes for the new TableConfig.set add d4258b7 [FLINK-25249][connector/kafka] Reimplement KafkaTestEnvironment with KafkaContainer add fe9e13f [FLINK-26143][connector/kafka] Temporarily disable FlinkKafkaProducerITCase.testFlinkKafkaProducerFailBeforeNotify because it hangs in IDE and CI add 5c6a220 [FLINK-25249][connector/kafka] Use 1 broker in Kafka tests to reduce complexity of environment No new revisions were added by this update. Summary of changes: .../flink/connector/kafka/testutils/KafkaUtil.java | 17 +- .../kafka/FlinkKafkaInternalProducerITCase.java| 31 +- .../connectors/kafka/FlinkKafkaProducerITCase.java | 31 +- .../connectors/kafka/KafkaConsumerTestBase.java| 79 ++--- .../streaming/connectors/kafka/KafkaTestBase.java | 2 +- .../connectors/kafka/KafkaTestEnvironment.java | 7 +- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 363 +++-- .../org/apache/flink/util/DockerImageVersions.java | 2 + 8 files changed, 235 insertions(+), 297 deletions(-)
[flink] 02/02: [FLINK-26287][table] Update some test classes for the new TableConfig.set
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 66af14beb296797a3e35a2d338e86d846ac130df Author: Timo Walther AuthorDate: Tue Feb 22 17:30:54 2022 +0100 [FLINK-26287][table] Update some test classes for the new TableConfig.set This closes #18895. --- .../file/table/FileSystemTableSinkTest.java| 8 +-- .../apache/flink/table/api/CompiledPlanITCase.java | 2 +- .../flink/table/planner/codegen/CodeSplitTest.java | 16 ++--- .../plan/batch/sql/ForwardHashExchangeTest.java| 70 +++--- .../nodes/exec/common/CommonExecSinkITCase.java| 18 ++ .../nodes/exec/operator/BatchOperatorNameTest.java | 10 +--- .../nodes/exec/operator/OperatorNameTestBase.java | 1 - .../exec/operator/StreamOperatorNameTest.java | 2 - .../exec/serde/ContextResolvedTableSerdeTest.java | 13 ++-- .../exec/stream/ChangelogSourceJsonPlanTest.java | 4 +- .../plan/nodes/exec/stream/ExpandJsonPlanTest.java | 9 +-- .../exec/stream/GroupAggregateJsonPlanTest.java| 13 +--- .../stream/IncrementalAggregateJsonPlanTest.java | 21 ++- .../exec/stream/WindowAggregateJsonPlanTest.java | 4 +- .../PushWatermarkIntoTableSourceScanRuleTest.java | 5 +- .../PushLocalAggIntoTableSourceScanRuleTest.java | 25 +++- .../batch/sql/ForwardHashExchangeITCase.java | 9 +-- .../sql/agg/LocalAggregatePushDownITCase.java | 14 ++--- .../jsonplan/DeduplicationJsonPlanITCase.java | 4 +- .../stream/jsonplan/ExpandJsonPlanITCase.java | 9 +-- .../jsonplan/GroupAggregateJsonPlanITCase.java | 13 +--- .../IncrementalAggregateJsonPlanITCase.java| 21 ++- .../stream/jsonplan/WindowAggregateJsonITCase.java | 7 +-- .../flink/table/api/TableEnvironmentITCase.scala | 8 +-- .../apache/flink/table/api/batch/ExplainTest.scala | 8 +-- .../flink/table/api/stream/ExplainTest.scala | 14 +++-- .../table/planner/catalog/CatalogTableITCase.scala | 3 +- .../table/planner/catalog/CatalogViewITCase.scala | 3 +- .../expressions/NonDeterministicTests.scala| 11 ++-- .../expressions/utils/ExpressionTestBase.scala | 2 +- .../plan/batch/sql/DagOptimizationTest.scala | 53 .../RemoveRedundantLocalSortAggRuleTest.scala | 10 ++-- .../table/PythonGroupWindowAggregateTest.scala | 4 +- .../plan/stream/table/TwoStageAggregateTest.scala | 14 ++--- .../GroupWindowTableAggregateValidationTest.scala | 4 +- .../planner/runtime/FileSystemITCaseBase.scala | 4 +- .../planner/runtime/batch/sql/CalcITCase.scala | 6 +- .../runtime/batch/sql/CodeSplitITCase.scala| 6 +- .../planner/runtime/batch/sql/DecimalITCase.scala | 28 - .../planner/runtime/batch/sql/MiscITCase.scala | 2 +- .../runtime/batch/sql/MultipleInputITCase.scala| 10 ++-- .../batch/sql/PartitionableSinkITCase.scala| 4 +- .../runtime/batch/sql/SortLimitITCase.scala| 3 +- .../runtime/batch/sql/TableSinkITCase.scala| 6 +- .../runtime/batch/sql/TableSourceITCase.scala | 5 +- .../planner/runtime/batch/sql/UnionITCase.scala| 10 ++-- .../sql/agg/AggregateJoinTransposeITCase.scala | 2 +- .../sql/agg/AggregateReduceGroupingITCase.scala| 17 +++--- .../runtime/batch/sql/agg/HashAggITCase.scala | 2 +- .../sql/agg/HashDistinctAggregateITCase.scala | 2 +- .../runtime/batch/sql/agg/SortAggITCase.scala | 6 +- .../sql/agg/SortDistinctAggregateITCase.scala | 2 +- .../runtime/batch/sql/join/InnerJoinITCase.scala | 5 +- .../runtime/batch/sql/join/JoinITCaseHelper.scala | 13 ++-- .../runtime/batch/table/DecimalITCase.scala| 28 - .../harness/GroupAggregateHarnessTest.scala| 8 +-- .../harness/WindowAggregateHarnessTest.scala | 3 +- .../planner/runtime/stream/sql/CalcITCase.scala| 2 +- .../runtime/stream/sql/ChangelogSourceITCase.scala | 4 +- .../runtime/stream/sql/DeduplicateITCase.scala | 10 ++-- .../runtime/stream/sql/GroupWindowITCase.scala | 15 ++--- .../planner/runtime/stream/sql/SortITCase.scala| 25 .../runtime/stream/sql/SplitAggregateITCase.scala | 8 +-- .../runtime/stream/sql/TemporalJoinITCase.scala| 12 ++-- .../runtime/stream/sql/WindowAggregateITCase.scala | 2 +- .../stream/sql/WindowDistinctAggregateITCase.scala | 5 +- .../runtime/stream/table/TableSinkITCase.scala | 3 +- .../planner/runtime/utils/BatchTestBase.scala | 6 +- .../runtime/utils/StreamingWithAggTestBase.scala | 4 +- .../flink/table/planner/utils/TableTestBase.scala | 11 ++-- 70 files changed, 286 insertions(+), 425 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java
[flink] branch master updated (44da40f -> 66af14b)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 44da40f [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest new d2e9473 [FLINK-26287][table] Add convenience method TableConfig.set new 66af14b [FLINK-26287][table] Update some test classes for the new TableConfig.set The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../docs/dev/python/dependency_management.md | 2 +- docs/content.zh/docs/dev/table/config.md | 24 .../docs/dev/table/sql/queries/overview.md | 12 ++-- docs/content.zh/docs/dev/table/tuning.md | 41 ++--- .../docs/dev/python/dependency_management.md | 2 +- docs/content/docs/dev/table/config.md | 24 .../content/docs/dev/table/sql/queries/overview.md | 8 +-- docs/content/docs/dev/table/tuning.md | 69 ++--- flink-python/pyflink/table/table_config.py | 9 +++ .../org/apache/flink/table/api/TableConfig.java| 49 +-- .../file/table/FileSystemTableSinkTest.java| 8 +-- .../apache/flink/table/api/CompiledPlanITCase.java | 2 +- .../flink/table/planner/codegen/CodeSplitTest.java | 16 ++--- .../plan/batch/sql/ForwardHashExchangeTest.java| 70 +++--- .../nodes/exec/common/CommonExecSinkITCase.java| 18 ++ .../nodes/exec/operator/BatchOperatorNameTest.java | 10 +--- .../nodes/exec/operator/OperatorNameTestBase.java | 1 - .../exec/operator/StreamOperatorNameTest.java | 2 - .../exec/serde/ContextResolvedTableSerdeTest.java | 13 ++-- .../exec/stream/ChangelogSourceJsonPlanTest.java | 4 +- .../plan/nodes/exec/stream/ExpandJsonPlanTest.java | 9 +-- .../exec/stream/GroupAggregateJsonPlanTest.java| 13 +--- .../stream/IncrementalAggregateJsonPlanTest.java | 21 ++- .../exec/stream/WindowAggregateJsonPlanTest.java | 4 +- .../PushWatermarkIntoTableSourceScanRuleTest.java | 5 +- .../PushLocalAggIntoTableSourceScanRuleTest.java | 25 +++- .../batch/sql/ForwardHashExchangeITCase.java | 9 +-- .../sql/agg/LocalAggregatePushDownITCase.java | 14 ++--- .../jsonplan/DeduplicationJsonPlanITCase.java | 4 +- .../stream/jsonplan/ExpandJsonPlanITCase.java | 9 +-- .../jsonplan/GroupAggregateJsonPlanITCase.java | 13 +--- .../IncrementalAggregateJsonPlanITCase.java| 21 ++- .../stream/jsonplan/WindowAggregateJsonITCase.java | 7 +-- .../flink/table/api/TableEnvironmentITCase.scala | 8 +-- .../apache/flink/table/api/batch/ExplainTest.scala | 8 +-- .../flink/table/api/stream/ExplainTest.scala | 14 +++-- .../table/planner/catalog/CatalogTableITCase.scala | 3 +- .../table/planner/catalog/CatalogViewITCase.scala | 3 +- .../expressions/NonDeterministicTests.scala| 11 ++-- .../expressions/utils/ExpressionTestBase.scala | 2 +- .../plan/batch/sql/DagOptimizationTest.scala | 53 .../RemoveRedundantLocalSortAggRuleTest.scala | 10 ++-- .../table/PythonGroupWindowAggregateTest.scala | 4 +- .../plan/stream/table/TwoStageAggregateTest.scala | 14 ++--- .../GroupWindowTableAggregateValidationTest.scala | 4 +- .../planner/runtime/FileSystemITCaseBase.scala | 4 +- .../planner/runtime/batch/sql/CalcITCase.scala | 6 +- .../runtime/batch/sql/CodeSplitITCase.scala| 6 +- .../planner/runtime/batch/sql/DecimalITCase.scala | 28 - .../planner/runtime/batch/sql/MiscITCase.scala | 2 +- .../runtime/batch/sql/MultipleInputITCase.scala| 10 ++-- .../batch/sql/PartitionableSinkITCase.scala| 4 +- .../runtime/batch/sql/SortLimitITCase.scala| 3 +- .../runtime/batch/sql/TableSinkITCase.scala| 6 +- .../runtime/batch/sql/TableSourceITCase.scala | 5 +- .../planner/runtime/batch/sql/UnionITCase.scala| 10 ++-- .../sql/agg/AggregateJoinTransposeITCase.scala | 2 +- .../sql/agg/AggregateReduceGroupingITCase.scala| 17 +++--- .../runtime/batch/sql/agg/HashAggITCase.scala | 2 +- .../sql/agg/HashDistinctAggregateITCase.scala | 2 +- .../runtime/batch/sql/agg/SortAggITCase.scala | 6 +- .../sql/agg/SortDistinctAggregateITCase.scala | 2 +- .../runtime/batch/sql/join/InnerJoinITCase.scala | 5 +- .../runtime/batch/sql/join/JoinITCaseHelper.scala | 13 ++-- .../runtime/batch/table/DecimalITCase.scala| 28 - .../harness/GroupAggregateHarnessTest.scala| 8 +-- .../harness/WindowAggregateHarnessTest.scala | 3 +- .../planner/runtime/stream/sql/CalcITCase.scala| 2 +- .../runtime/stream/sql/ChangelogSourceITCase.scala | 4 +-
[flink] 01/02: [FLINK-26287][table] Add convenience method TableConfig.set
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit d2e947361d6e413f28b2cccebeec2e1761bc253b Author: Timo Walther AuthorDate: Tue Feb 22 17:30:45 2022 +0100 [FLINK-26287][table] Add convenience method TableConfig.set --- .../docs/dev/python/dependency_management.md | 2 +- docs/content.zh/docs/dev/table/config.md | 24 .../docs/dev/table/sql/queries/overview.md | 12 ++-- docs/content.zh/docs/dev/table/tuning.md | 41 ++--- .../docs/dev/python/dependency_management.md | 2 +- docs/content/docs/dev/table/config.md | 24 .../content/docs/dev/table/sql/queries/overview.md | 8 +-- docs/content/docs/dev/table/tuning.md | 69 +++--- flink-python/pyflink/table/table_config.py | 9 +++ .../org/apache/flink/table/api/TableConfig.java| 49 +-- 10 files changed, 141 insertions(+), 99 deletions(-) diff --git a/docs/content.zh/docs/dev/python/dependency_management.md b/docs/content.zh/docs/dev/python/dependency_management.md index 14ca3cd..c818686 100644 --- a/docs/content.zh/docs/dev/python/dependency_management.md +++ b/docs/content.zh/docs/dev/python/dependency_management.md @@ -302,7 +302,7 @@ import org.apache.flink.table.api.TableEnvironment; TableEnvironment tEnv = TableEnvironment.create( EnvironmentSettings.inBatchMode()); -tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); +tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1); // register the Python UDF tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python"); diff --git a/docs/content.zh/docs/dev/table/config.md b/docs/content.zh/docs/dev/table/config.md index 2a4ac8f..70939a9 100644 --- a/docs/content.zh/docs/dev/table/config.md +++ b/docs/content.zh/docs/dev/table/config.md @@ -49,11 +49,11 @@ Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可 TableEnvironment tEnv = ... // access flink configuration -Configuration configuration = tEnv.getConfig().getConfiguration(); +TableConfig configuration = tEnv.getConfig(); // set low-level key-value options -configuration.setString("table.exec.mini-batch.enabled", "true"); -configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); -configuration.setString("table.exec.mini-batch.size", "5000"); +configuration.set("table.exec.mini-batch.enabled", "true"); +configuration.set("table.exec.mini-batch.allow-latency", "5 s"); +configuration.set("table.exec.mini-batch.size", "5000"); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -62,11 +62,11 @@ configuration.setString("table.exec.mini-batch.size", "5000"); val tEnv: TableEnvironment = ... // access flink configuration -val configuration = tEnv.getConfig().getConfiguration() +val configuration = tEnv.getConfig() // set low-level key-value options -configuration.setString("table.exec.mini-batch.enabled", "true") -configuration.setString("table.exec.mini-batch.allow-latency", "5 s") -configuration.setString("table.exec.mini-batch.size", "5000") +configuration.set("table.exec.mini-batch.enabled", "true") +configuration.set("table.exec.mini-batch.allow-latency", "5 s") +configuration.set("table.exec.mini-batch.size", "5000") ``` {{< /tab >}} {{< tab "Python" >}} @@ -75,11 +75,11 @@ configuration.setString("table.exec.mini-batch.size", "5000") t_env = ... # access flink configuration -configuration = t_env.get_config().get_configuration() +configuration = t_env.get_config() # set low-level key-value options -configuration.set_string("table.exec.mini-batch.enabled", "true") -configuration.set_string("table.exec.mini-batch.allow-latency", "5 s") -configuration.set_string("table.exec.mini-batch.size", "5000") +configuration.set("table.exec.mini-batch.enabled", "true") +configuration.set("table.exec.mini-batch.allow-latency", "5 s") +configuration.set("table.exec.mini-batch.size", "5000") ``` {{< /tab >}} {{< tab "SQL CLI" >}} diff --git a/docs/content.zh/docs/dev/table/sql/queries/overview.md b/docs/content.zh/docs/dev/table/sql/queries/overview.md index 4d5..a1074a8 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/overview.md +++ b/docs/content.zh/docs/dev/table/sql/queries/overview.md @@ -205,10 +205,10 @@ tableResult2.print(); val env = StreamExecutionEnvironment.getExecutionEnvironment() val tableEnv = StreamTableEnvironment.create(env, settings) // enable checkpointing -tableEnv.getConfig.getConfiguration.set( - ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) -tableEnv.getConfig.getConfiguration.set( - ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)) +tableEnv.getConfig + .set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) +tableEnv.getConfig +
[flink] branch master updated (a34b448 -> 44da40f)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from a34b448 [FLINK-26354] -restoreMode should be --restoreMode and should have a shorthand add f4e5836 [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments add cb3dfb5 [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest add 44da40f [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest No new revisions were added by this update. Summary of changes: .../io/network/buffer/NetworkBufferPoolTest.java | 101 - 1 file changed, 59 insertions(+), 42 deletions(-)
[flink] branch master updated: [FLINK-26354] -restoreMode should be --restoreMode and should have a shorthand
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a34b448 [FLINK-26354] -restoreMode should be --restoreMode and should have a shorthand a34b448 is described below commit a34b448c8e8f0dc7705548c028dd463a2fb0ddc9 Author: Dawid Wysakowicz AuthorDate: Fri Feb 25 09:05:40 2022 +0100 [FLINK-26354] -restoreMode should be --restoreMode and should have a shorthand --- .../apache/flink/client/cli/CliFrontendParser.java | 1 + .../flink/client/cli/CliFrontendRunTest.java | 59 +- 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index ef202f6..d9b2370 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -134,6 +134,7 @@ public class CliFrontendParser { public static final Option SAVEPOINT_RESTORE_MODE = new Option( +"rm", "restoreMode", true, "Defines how should we restore from the given savepoint. Supported options: " diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index 2d10ad11..ea82cc9 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -133,50 +133,37 @@ public class CliFrontendRunTest extends CliFrontendTestBase { @Test public void testClaimRestoreModeParsing() throws Exception { -// test configure savepoint with claim mode -String[] parameters = { -"-s", "expectedSavepointPath", "-n", "-restoreMode", "claim", getTestJarPath() -}; - -CommandLine commandLine = -CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true); -ProgramOptions programOptions = ProgramOptions.create(commandLine); -ExecutionConfigAccessor executionOptions = -ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList()); - -SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); -assertTrue(savepointSettings.restoreSavepoint()); -assertEquals(RestoreMode.CLAIM, savepointSettings.getRestoreMode()); -assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); -assertTrue(savepointSettings.allowNonRestoredState()); +testRestoreMode("-rm", "claim", RestoreMode.CLAIM); } @Test public void testLegacyRestoreModeParsing() throws Exception { -// test configure savepoint with claim mode -String[] parameters = { -"-s", "expectedSavepointPath", "-n", "-restoreMode", "legacy", getTestJarPath() -}; +testRestoreMode("-rm", "legacy", RestoreMode.LEGACY); +} -CommandLine commandLine = -CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true); -ProgramOptions programOptions = ProgramOptions.create(commandLine); -ExecutionConfigAccessor executionOptions = -ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList()); +@Test +public void testNoClaimRestoreModeParsing() throws Exception { +testRestoreMode("-rm", "no_claim", RestoreMode.NO_CLAIM); +} -SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); -assertTrue(savepointSettings.restoreSavepoint()); -assertEquals(RestoreMode.LEGACY, savepointSettings.getRestoreMode()); -assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); -assertTrue(savepointSettings.allowNonRestoredState()); +@Test +public void testClaimRestoreModeParsingLongOption() throws Exception { +testRestoreMode("--restoreMode", "claim", RestoreMode.CLAIM); } @Test -public void testNoClaimRestoreModeParsing() throws Exception { -// test configure savepoint with claim mode -String[] parameters = { -"-s", "expectedSavepointPath", "-n", "-restoreMode", "no_claim", getTestJarPath() -}; +public void testLegacyRestoreModeParsingLongOption() throws Exception { +testRestoreMode("--restoreMode", "legacy", RestoreMode.LEGACY); +} + +@Test +public void testNoClaimRestoreModeParsingLongOption() throws Exception { +
svn commit: r52746 - in /dev/flink/flink-1.14.4-rc1: ./ python/
Author: knaufk Date: Fri Feb 25 13:29:00 2022 New Revision: 52746 Log: Add flink-1.14.4-rc1 Added: dev/flink/flink-1.14.4-rc1/ dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz (with props) dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.asc (with props) dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.sha512 dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.12.tgz (with props) dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.12.tgz.asc (with props) dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.12.tgz.sha512 dev/flink/flink-1.14.4-rc1/flink-1.14.4-src.tgz (with props) dev/flink/flink-1.14.4-rc1/flink-1.14.4-src.tgz.asc (with props) dev/flink/flink-1.14.4-rc1/flink-1.14.4-src.tgz.sha512 dev/flink/flink-1.14.4-rc1/python/ dev/flink/flink-1.14.4-rc1/python/apache-flink-1.14.4.tar.gz (with props) dev/flink/flink-1.14.4-rc1/python/apache-flink-1.14.4.tar.gz.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache-flink-1.14.4.tar.gz.sha512 dev/flink/flink-1.14.4-rc1/python/apache-flink-libraries-1.14.4.tar.gz (with props) dev/flink/flink-1.14.4-rc1/python/apache-flink-libraries-1.14.4.tar.gz.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache-flink-libraries-1.14.4.tar.gz.sha512 dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp36-cp36m-macosx_10_9_x86_64.whl (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp36-cp36m-macosx_10_9_x86_64.whl.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp36-cp36m-macosx_10_9_x86_64.whl.sha512 dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp36-cp36m-manylinux1_x86_64.whl (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp36-cp36m-manylinux1_x86_64.whl.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp36-cp36m-manylinux1_x86_64.whl.sha512 dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp37-cp37m-macosx_10_9_x86_64.whl (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp37-cp37m-macosx_10_9_x86_64.whl.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp37-cp37m-macosx_10_9_x86_64.whl.sha512 dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp37-cp37m-manylinux1_x86_64.whl (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp37-cp37m-manylinux1_x86_64.whl.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp37-cp37m-manylinux1_x86_64.whl.sha512 dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp38-cp38-macosx_10_9_x86_64.whl (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp38-cp38-macosx_10_9_x86_64.whl.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp38-cp38-macosx_10_9_x86_64.whl.sha512 dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp38-cp38-manylinux1_x86_64.whl (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp38-cp38-manylinux1_x86_64.whl.asc (with props) dev/flink/flink-1.14.4-rc1/python/apache_flink-1.14.4-cp38-cp38-manylinux1_x86_64.whl.sha512 Added: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz == Binary file - no diff available. Propchange: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz -- svn:mime-type = application/gzip Added: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.asc == Binary file - no diff available. Propchange: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.asc -- svn:mime-type = application/pgp-signature Added: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.sha512 == --- dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.sha512 (added) +++ dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.11.tgz.sha512 Fri Feb 25 13:29:00 2022 @@ -0,0 +1 @@ +923f2d08d58d91809940f7d458b797b7099ce566ed635c5456a0e36c3ba5b1a200dd595113ffcdd972304a691baf6222b48f2fe7807c9b6286ff60c930a1df5f flink-1.14.4-bin-scala_2.11.tgz Added: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.12.tgz == Binary file - no diff available. Propchange: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.12.tgz -- svn:mime-type = application/gzip Added: dev/flink/flink-1.14.4-rc1/flink-1.14.4-bin-scala_2.12.tgz.asc
[flink] annotated tag release-1.14.4-rc1 updated (895c609 -> d3db692)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a change to annotated tag release-1.14.4-rc1 in repository https://gitbox.apache.org/repos/asf/flink.git. *** WARNING: tag release-1.14.4-rc1 was modified! *** from 895c609 (commit) to d3db692 (tag) tagging 895c60940a8a7c95bef1ebe9f92c0baf168be145 (commit) replaces pre-apache-rename by Konstantin Knauf on Fri Feb 25 11:58:38 2022 +0100 - Log - release-1.14.4-rc1 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiEEzPqFL9A5OAqz7DbQjD+wB/5g3voFAmIYtl4ACgkQjD+wB/5g 3vrs8g/+NtgVefzyRW8Tx/+9lQy1BGhpcmkvT69WN0iz021ttTZYw2OAfcG00PyW 2tiqca1D0lC1OTJLCn/E+W3fBVnlSUyDGBv+JIB/inCQj9VtUpczbaUd1sDLfvQx R9vT1JD6PffneYhs+ScES12G8wjsEtPVcfI10NeF09ZOe/f7MmFsYHebqLTrWNEY YWcWgI/vmBakuuDIEstAylTUF3E8PiIuhFgUvjp8U2glTLJLnfyoHpynwIWfjpqG dl/KvAN6pKNwYBBfekgdiZHSCTMI/wIC8k3MkK+Nwq+5EwQKI8KNNS1+K84mvVMs 2EKlPMZsXf/KP3QYNSKfqZElQB7zKc5Q+TD8ln6JRy2xbqv2HCJLUK9XMusaRIWq 6OQ/1vGtAQ0ePmV3zFjA5cIq+dUiwsNfoxw8LkTevnQ8o+v+Y5u1z4Q1F7joTrNg kO4TUelRsiY5M1mKpB8Cs+2Xw8FReSmGRm13wF7xWCVkzYIgEc9KMseFWC+AeB+a lX62vZ7yX7ClyiyUxZ2kC7nPFy9SXF9s78POzVOGB32hX89v8qufao1Aww4iBjsU Lo2V368qa9WLNXCda416tSKz87gwvMf++guXQHrSRSFBPh97tm0VRAMLNmztOgYd BmJ32y6cfAhEyqfq2TgylGU0vaSKn7gJK3RLeqou2cXdvRN2jto= =FSLj -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
wangyang0918 commented on a change in pull request #21: URL: https://github.com/apache/flink-kubernetes-operator/pull/21#discussion_r814713388 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java ## @@ -145,11 +190,16 @@ private void updateForReconciliationError(FlinkDeployment flinkApp, String err) @Override public List prepareEventSources( EventSourceContext eventSourceContext) { -// TODO: start status updated -//return List.of(new PerResourcePollingEventSource<>( -//new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD, -//FlinkApplication.class)); -return Collections.emptyList(); +// reconcile when job manager deployment and REST API are ready +SharedIndexInformer deploymentInformer = +kubernetesClient +.apps() +.deployments() +.inAnyNamespace() +.withLabel("type", "flink-native-kubernetes") Review comment: Using constants in `org.apache.flink.kubernetes.utils.Constants` could make us aware of upstream changes. ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java ## @@ -114,6 +126,39 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { } catch (Exception e) { throw new ReconciliationException(e); } + +return checkDeployment(flinkApp, context); +} + +private UpdateControl checkDeployment( +FlinkDeployment flinkApp, Context context) { +if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) { Review comment: Why do we use `selfLink` here? I am afraid the `selfLink` field does not always exist. ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java ## @@ -145,11 +190,16 @@ private void updateForReconciliationError(FlinkDeployment flinkApp, String err) @Override public List prepareEventSources( EventSourceContext eventSourceContext) { -// TODO: start status updated -//return List.of(new PerResourcePollingEventSource<>( -//new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD, -//FlinkApplication.class)); -return Collections.emptyList(); +// reconcile when job manager deployment and REST API are ready +SharedIndexInformer deploymentInformer = +kubernetesClient +.apps() +.deployments() +.inAnyNamespace() Review comment: Maybe we do not need to watch all the namespaces if `FLINK_OPERATOR_WATCH_NAMESPACES` configured. ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java ## @@ -114,6 +126,39 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { } catch (Exception e) { throw new ReconciliationException(e); } + +return checkDeployment(flinkApp, context); +} + +private UpdateControl checkDeployment( +FlinkDeployment flinkApp, Context context) { +if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) { +Optional deployment = context.getSecondaryResource(Deployment.class); +if (deployment.isPresent()) { +DeploymentStatus status = deployment.get().getStatus(); +DeploymentSpec spec = deployment.get().getSpec(); +if (status != null +&& status.getAvailableReplicas() != null +&& spec.getReplicas().intValue() == status.getReplicas() +&& spec.getReplicas().intValue() == status.getAvailableReplicas()) { +LOG.info( +"JobManager deployment {} in namespace {} is ready", +flinkApp.getMetadata().getName(), +flinkApp.getMetadata().getNamespace()); + jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink()); Review comment: Do we need to remove the current flinkApp from cache `jobManagerDeployments` when the replicas is not enough? For example, the JobManager crashed for a while. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact
[flink] branch master updated (393bc07 -> 3ebe919)
This is an automated email from the ASF dual-hosted git repository. jingzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 393bc07 [typos] Typos in PULL_REQUEST_TEMPLATE.md add 3ebe919 [FLINK-21565][table-planner] Support more integer types in TIMESTAMPADD This closes #17793 No new revisions were added by this update. Summary of changes: .../casting/NumericPrimitiveCastRule.java | 16 ++- .../planner/expressions/ScalarFunctionsTest.scala | 44 .../planner/runtime/batch/sql/CalcITCase.scala | 114 + 3 files changed, 172 insertions(+), 2 deletions(-)
[flink] branch master updated (aeb3822 -> 393bc07)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from aeb3822 [FLINK-25927][docs][formats] Add DataStream documentation for CSV format add 393bc07 [typos] Typos in PULL_REQUEST_TEMPLATE.md No new revisions were added by this update. Summary of changes: .github/PULL_REQUEST_TEMPLATE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
wangyang0918 commented on pull request #21: URL: https://github.com/apache/flink-kubernetes-operator/pull/21#issuecomment-1050768766 I am getting to this PR now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated: [FLINK-25927][docs][formats] Add DataStream documentation for CSV format
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new aeb3822 [FLINK-25927][docs][formats] Add DataStream documentation for CSV format aeb3822 is described below commit aeb3822ece887734dcaed5b2554f5583488d2dc0 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Thu Feb 24 01:34:18 2022 +0100 [FLINK-25927][docs][formats] Add DataStream documentation for CSV format --- .../docs/connectors/datastream/formats/csv.md | 60 ++ 1 file changed, 60 insertions(+) diff --git a/docs/content/docs/connectors/datastream/formats/csv.md b/docs/content/docs/connectors/datastream/formats/csv.md new file mode 100644 index 000..15d47ed --- /dev/null +++ b/docs/content/docs/connectors/datastream/formats/csv.md @@ -0,0 +1,60 @@ +--- +title: "CSV" +weight: 4 +type: docs +aliases: +- /dev/connectors/formats/csv.html +- /apis/streaming/connectors/formats/csv.html +--- + + + +# CSV format + +To use the CSV format you need to add the Flink CSV dependency to your project: + +```xml + + org.apache.flink + flink-csv + {{< version >}} + +``` + +Flink supports reading CSV files using `CsvReaderFormat`. The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options. + +`CsvReaderFormat` can be initialized and used like this: +```java +CsvReaderFormat csvFormat = CsvReaderFormat.forPojo(SomePojo.class); +FileSource source = +FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build(); +``` + +The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns). + +If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`: + +```java +CsvReaderFormat forSchema(CsvMapper mapper, + CsvSchema schema, + TypeInformation typeInformation) +``` + +Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both continues and batch modes (see [TextLineInputFormat]({{< ref "docs/connectors/datastream/formats/text_files" >}}) for examples).
[flink] branch master updated (c511c9a -> 4270d86)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c511c9a [FLINK-25761][docs-zh] Translate Avro format page into Chinese (#18641) add 4270d86 [FLINK-26320][docs] Update the default value from "1m" to "1min" in Hive page No new revisions were added by this update. Summary of changes: docs/content.zh/docs/connectors/table/hive/hive_read_write.md | 2 +- docs/content/docs/connectors/table/hive/hive_read_write.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[flink] branch master updated (771e08d -> c511c9a)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 771e08d [FLINK-26186][test] Allow caller annotated with @VisibleForTesting call target method annotated with VisibleForTesting. add c511c9a [FLINK-25761][docs-zh] Translate Avro format page into Chinese (#18641) No new revisions were added by this update. Summary of changes: .../docs/connectors/datastream/formats/avro.md | 17 + 1 file changed, 9 insertions(+), 8 deletions(-)
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #21: [FLINK-26261] Wait for JobManager deployment ready before accessing REST API
gyfora commented on pull request #21: URL: https://github.com/apache/flink-kubernetes-operator/pull/21#issuecomment-1050687538 @wangyang0918 if you have some time to look at this we could use a bit of extra feedback :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #23: [FLINK-26178] Use the same enum for expected and observed jobstate
bgeng777 commented on pull request #23: URL: https://github.com/apache/flink-kubernetes-operator/pull/23#issuecomment-1050674284 Will start refining this PR once [FLINK-26139](https://issues.apache.org/jira/browse/FLINK-26139) is done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] bgeng777 closed pull request #23: [FLINK-26178] Use the same enum for expected and observed jobstate
bgeng777 closed pull request #23: URL: https://github.com/apache/flink-kubernetes-operator/pull/23 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #23: [FLINK-26178] Use the same enum for expected and observed jobstate
bgeng777 commented on a change in pull request #23: URL: https://github.com/apache/flink-kubernetes-operator/pull/23#discussion_r814598597 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java ## @@ -95,7 +95,7 @@ private JobStatus mergeJobStatus( if (newStatus == null) { newStatus = createJobStatus(newJob); } else { -newStatus.setState(newJob.getJobState().name()); +newStatus.setState(JobState.valueOf(newJob.getJobState().name())); Review comment: Oh, it doesn't matter. The work is not a waste at all, I may just need to refine it once we achieve consensus on the state transition. Thanks a lot for the above review and I will close this PR and may reopen it when [FLINK-26139](https://issues.apache.org/jira/browse/FLINK-26139) is done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (3d87483 -> 771e08d)
This is an automated email from the ASF dual-hosted git repository. fpaul pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 3d87483 [hotfix][table-planner] Clean up serde classes add 5e1ca93 [FLINK-26186][test] use reside in shaded package filter add 771e08d [FLINK-26186][test] Allow caller annotated with @VisibleForTesting call target method annotated with VisibleForTesting. No new revisions were added by this update. Summary of changes: .../18509c9e-3250-4c52-91b9-11ccefc85db1 | 222 +++--- .../5b9eed8a-5fb6-4373-98ac-3be2a71941b8 | 811 ++--- .../7602816f-5c01-4b7a-9e3e-235dfedec245 | 2 +- .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 | 37 +- .../architecture/rules/ApiAnnotationRules.java | 15 + 5 files changed, 543 insertions(+), 544 deletions(-)
[flink] 02/02: [hotfix][table-planner] Clean up serde classes
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 3d8748316f597e6936442dc514f266ffbcce4bed Author: Timo Walther AuthorDate: Thu Feb 24 14:21:25 2022 +0100 [hotfix][table-planner] Clean up serde classes --- .../plan/abilities/sink/WritingMetadataSpec.java | 6 --- .../abilities/source/SourceAbilitySpecBase.java| 6 --- .../planner/plan/logical/WindowingStrategy.java| 6 --- .../table/planner/plan/nodes/exec/ExecNode.java| 6 --- .../planner/plan/nodes/exec/InputProperty.java | 6 --- .../plan/nodes/exec/common/CommonExecValues.java | 10 +--- .../exec/serde/AggregateCallJsonDeserializer.java | 12 +++-- .../exec/serde/AggregateCallJsonSerializer.java| 18 +++ .../exec/serde/ChangelogModeJsonDeserializer.java | 14 ++--- .../exec/serde/ChangelogModeJsonSerializer.java| 11 ++-- .../nodes/exec/serde/ColumnJsonDeserializer.java | 11 +++- .../nodes/exec/serde/ColumnJsonSerializer.java | 31 ++- .../ContextResolvedTableJsonDeserializer.java | 11 +++- .../serde/ContextResolvedTableJsonSerializer.java | 16 -- .../nodes/exec/serde/DataTypeJsonDeserializer.java | 4 +- .../nodes/exec/serde/DataTypeJsonSerializer.java | 20 .../exec/serde/ExecNodeGraphJsonDeserializer.java | 13 +++-- .../exec/serde/ExecNodeGraphJsonSerializer.java| 11 ++-- .../exec/serde/FlinkVersionJsonDeserializer.java | 12 +++-- .../exec/serde/FlinkVersionJsonSerializer.java | 12 +++-- .../plan/nodes/exec/serde/JsonPlanEdge.java| 24 + .../plan/nodes/exec/serde/JsonPlanGraph.java | 16 +++--- .../plan/nodes/exec/serde/JsonSerdeUtil.java | 24 - .../exec/serde/LogicalTypeJsonDeserializer.java| 4 +- .../exec/serde/LogicalTypeJsonSerializer.java | 60 +++--- .../exec/serde/LogicalWindowJsonDeserializer.java | 11 ++-- .../exec/serde/LogicalWindowJsonSerializer.java| 39 +++--- .../serde/ObjectIdentifierJsonDeserializer.java| 12 +++-- .../exec/serde/ObjectIdentifierJsonSerializer.java | 12 +++-- .../exec/serde/RelDataTypeJsonDeserializer.java| 4 +- .../exec/serde/RelDataTypeJsonSerializer.java | 4 +- .../RequiredDistributionJsonDeserializer.java | 15 -- .../serde/RequiredDistributionJsonSerializer.java | 12 +++-- .../ResolvedCatalogTableJsonDeserializer.java | 11 +++- .../serde/ResolvedCatalogTableJsonSerializer.java | 19 --- .../serde/ResolvedExpressionJsonDeserializer.java | 11 +++- .../serde/ResolvedExpressionJsonSerializer.java| 19 --- .../exec/serde/ResolvedSchemaJsonDeserializer.java | 11 +++- .../exec/serde/ResolvedSchemaJsonSerializer.java | 17 -- .../nodes/exec/serde/RexNodeJsonDeserializer.java | 4 +- .../nodes/exec/serde/RexNodeJsonSerializer.java| 60 +++--- .../exec/serde/RexWindowBoundJsonDeserializer.java | 11 ++-- .../exec/serde/RexWindowBoundJsonSerializer.java | 28 +- .../plan/nodes/exec/serde/SerdeContext.java| 2 + .../nodes/exec/serde/ShuffleJsonDeserializer.java | 15 -- .../nodes/exec/serde/ShuffleJsonSerializer.java| 12 +++-- .../nodes/exec/serde/UniqueConstraintMixin.java| 10 ++-- .../plan/nodes/exec/serde/WatermarkSpecMixin.java | 6 ++- .../serde/WindowReferenceJsonDeserializer.java | 4 +- .../exec/serde/WindowReferenceJsonSerializer.java | 8 +-- .../planner/plan/nodes/exec/spec/OverSpec.java | 9 .../stream/StreamExecGlobalGroupAggregate.java | 6 --- .../stream/StreamExecGlobalWindowAggregate.java| 6 --- .../stream/StreamExecGroupWindowAggregate.java | 6 --- .../StreamExecIncrementalGroupAggregate.java | 6 --- .../plan/nodes/exec/stream/StreamExecSink.java | 6 --- .../table/planner/plan/utils/LookupJoinUtil.java | 6 --- 57 files changed, 416 insertions(+), 350 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java index 46a9845..dcdd948 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java @@ -21,8 +21,6 @@ package org.apache.flink.table.planner.plan.abilities.sink; import org.apache.flink.table.api.TableException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer; -import
[flink] 01/02: [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5c3c90d29024370438d41255aa2650758d9c757f Author: Timo Walther AuthorDate: Mon Feb 21 15:51:37 2022 +0100 [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan This closes #18872. --- .../exec/serde/AggregateCallJsonDeserializer.java | 143 +++-- .../exec/serde/AggregateCallJsonSerializer.java| 73 +++ .../nodes/exec/serde/RexNodeJsonDeserializer.java | 6 +- .../nodes/exec/serde/RexNodeJsonSerializer.java| 5 +- .../ExpandJsonPlanTest_jsonplan/testExpand.out | 25 +--- ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 39 ++ ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 78 +++ ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 19 +-- ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 38 ++ ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 25 +--- ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 50 ++- ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 37 +- ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 37 +- .../testEventTimeHopWindow.out | 13 +- .../testEventTimeSessionWindow.out | 13 +- .../testEventTimeTumbleWindow.out | 30 + .../testProcTimeHopWindow.out | 6 +- .../testProcTimeSessionWindow.out | 6 +- .../testProcTimeTumbleWindow.out | 7 +- .../testIncrementalAggregate.out | 20 +-- ...lAggregateWithSumCountDistinctAndRetraction.out | 84 +++- .../testInnerJoinWithPk.out| 12 +- .../testProcTimeBoundedNonPartitionedRangeOver.out | 7 +- .../testProcTimeBoundedPartitionedRangeOver.out| 13 +- ...undedPartitionedRowsOverWithBuiltinProctime.out | 19 +-- .../testProcTimeUnboundedPartitionedRangeOver.out | 13 +- ...stProctimeBoundedDistinctPartitionedRowOver.out | 13 +- ...edDistinctWithNonDistinctPartitionedRowOver.out | 33 ++--- .../testRowTimeBoundedPartitionedRowsOver.out | 7 +- .../testDistinctSplitEnabled.out | 76 +++ .../testEventTimeCumulateWindow.out| 26 +--- .../testEventTimeCumulateWindowWithOffset.out | 26 +--- .../testEventTimeHopWindow.out | 26 +--- .../testEventTimeHopWindowWithOffset.out | 26 +--- .../testEventTimeTumbleWindow.out | 60 ++--- .../testEventTimeTumbleWindowWithOffset.out| 60 ++--- .../testProcTimeCumulateWindow.out | 7 +- .../testProcTimeHopWindow.out | 6 +- .../testProcTimeTumbleWindow.out | 7 +- .../testEventTimeTumbleWindow.out | 56 +++- 40 files changed, 268 insertions(+), 979 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java index 69c1cce..28eef53 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java @@ -18,24 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.catalog.ContextResolvedFunction; -import org.apache.flink.table.catalog.DataTypeFactory; -import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.functions.FunctionIdentifier; -import org.apache.flink.table.functions.FunctionKind; -import org.apache.flink.table.functions.ImperativeAggregateFunction; -import org.apache.flink.table.functions.UserDefinedFunctionHelper; -import org.apache.flink.table.module.CoreModule; -import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction; -import org.apache.flink.table.planner.functions.utils.AggSqlFunction; -import org.apache.flink.table.types.inference.TypeInference; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.table.utils.EncodingUtils; -import org.apache.flink.util.Preconditions; - import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import
[flink] branch master updated (8d86133 -> 3d87483)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 8d86133 [FLINK-26121][runtime] Adds clearUnhandledEvents() method new 5c3c90d [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan new 3d87483 [hotfix][table-planner] Clean up serde classes The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../plan/abilities/sink/WritingMetadataSpec.java | 6 - .../abilities/source/SourceAbilitySpecBase.java| 6 - .../planner/plan/logical/WindowingStrategy.java| 6 - .../table/planner/plan/nodes/exec/ExecNode.java| 6 - .../planner/plan/nodes/exec/InputProperty.java | 6 - .../plan/nodes/exec/common/CommonExecValues.java | 10 +- .../exec/serde/AggregateCallJsonDeserializer.java | 153 - .../exec/serde/AggregateCallJsonSerializer.java| 89 .../exec/serde/ChangelogModeJsonDeserializer.java | 14 +- .../exec/serde/ChangelogModeJsonSerializer.java| 11 +- .../nodes/exec/serde/ColumnJsonDeserializer.java | 11 +- .../nodes/exec/serde/ColumnJsonSerializer.java | 31 +++-- .../ContextResolvedTableJsonDeserializer.java | 11 +- .../serde/ContextResolvedTableJsonSerializer.java | 16 ++- .../nodes/exec/serde/DataTypeJsonDeserializer.java | 4 +- .../nodes/exec/serde/DataTypeJsonSerializer.java | 20 +-- .../exec/serde/ExecNodeGraphJsonDeserializer.java | 13 +- .../exec/serde/ExecNodeGraphJsonSerializer.java| 11 +- .../exec/serde/FlinkVersionJsonDeserializer.java | 12 +- .../exec/serde/FlinkVersionJsonSerializer.java | 12 +- .../plan/nodes/exec/serde/JsonPlanEdge.java| 24 ++-- .../plan/nodes/exec/serde/JsonPlanGraph.java | 16 ++- .../plan/nodes/exec/serde/JsonSerdeUtil.java | 24 ++-- .../exec/serde/LogicalTypeJsonDeserializer.java| 4 +- .../exec/serde/LogicalTypeJsonSerializer.java | 60 .../exec/serde/LogicalWindowJsonDeserializer.java | 11 +- .../exec/serde/LogicalWindowJsonSerializer.java| 39 +++--- .../serde/ObjectIdentifierJsonDeserializer.java| 12 +- .../exec/serde/ObjectIdentifierJsonSerializer.java | 12 +- .../exec/serde/RelDataTypeJsonDeserializer.java| 4 +- .../exec/serde/RelDataTypeJsonSerializer.java | 4 +- .../RequiredDistributionJsonDeserializer.java | 15 +- .../serde/RequiredDistributionJsonSerializer.java | 12 +- .../ResolvedCatalogTableJsonDeserializer.java | 11 +- .../serde/ResolvedCatalogTableJsonSerializer.java | 19 ++- .../serde/ResolvedExpressionJsonDeserializer.java | 11 +- .../serde/ResolvedExpressionJsonSerializer.java| 19 ++- .../exec/serde/ResolvedSchemaJsonDeserializer.java | 11 +- .../exec/serde/ResolvedSchemaJsonSerializer.java | 17 ++- .../nodes/exec/serde/RexNodeJsonDeserializer.java | 10 +- .../nodes/exec/serde/RexNodeJsonSerializer.java| 65 - .../exec/serde/RexWindowBoundJsonDeserializer.java | 11 +- .../exec/serde/RexWindowBoundJsonSerializer.java | 28 ++-- .../plan/nodes/exec/serde/SerdeContext.java| 2 + .../nodes/exec/serde/ShuffleJsonDeserializer.java | 15 +- .../nodes/exec/serde/ShuffleJsonSerializer.java| 12 +- .../nodes/exec/serde/UniqueConstraintMixin.java| 10 +- .../plan/nodes/exec/serde/WatermarkSpecMixin.java | 6 +- .../serde/WindowReferenceJsonDeserializer.java | 4 +- .../exec/serde/WindowReferenceJsonSerializer.java | 8 +- .../planner/plan/nodes/exec/spec/OverSpec.java | 9 -- .../stream/StreamExecGlobalGroupAggregate.java | 6 - .../stream/StreamExecGlobalWindowAggregate.java| 6 - .../stream/StreamExecGroupWindowAggregate.java | 6 - .../StreamExecIncrementalGroupAggregate.java | 6 - .../plan/nodes/exec/stream/StreamExecSink.java | 6 - .../table/planner/plan/utils/LookupJoinUtil.java | 6 - .../ExpandJsonPlanTest_jsonplan/testExpand.out | 25 +--- ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 39 ++ ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 78 +++ ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 19 +-- ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 38 ++--- ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 25 +--- ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 50 ++- ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 37 + ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 37 + .../testEventTimeHopWindow.out | 13 +- .../testEventTimeSessionWindow.out | 13 +- .../testEventTimeTumbleWindow.out | 30 +--- .../testProcTimeHopWindow.out
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #23: [FLINK-26178] Use the same enum for expected and observed jobstate
gyfora commented on a change in pull request #23: URL: https://github.com/apache/flink-kubernetes-operator/pull/23#discussion_r814563646 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java ## @@ -95,7 +95,7 @@ private JobStatus mergeJobStatus( if (newStatus == null) { newStatus = createJobStatus(newJob); } else { -newStatus.setState(newJob.getJobState().name()); +newStatus.setState(JobState.valueOf(newJob.getJobState().name())); Review comment: Yea I agree we need to first figure out how exactly we want to handle the state transitions and react to different job states. I am sorry to have wasted your time with this ticket, I should have thought more about it before opening it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (ec83c08 -> 8d86133)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ec83c08 [FLINK-25848][connectors/firehose] Separated unit style tests for the KDF Sink into KinesisFirehoseSinkTest add 7b9b21d [hotfix] Uses close instead of stop in ZooKeeperExtension post-test cleanup add 6431155 [hotfix] Aligns state check in ZooKeeperExtension add 55d62c2 [hotfix] Migrates ZooKeeperLeaderRetrievalConnectionHandlingTest to JUnit5/assertj add 8d86133 [FLINK-26121][runtime] Adds clearUnhandledEvents() method No new revisions were added by this update. Summary of changes: ...eeperLeaderRetrievalConnectionHandlingTest.java | 460 ++--- .../runtime/zookeeper/ZooKeeperExtension.java | 74 +++- 2 files changed, 271 insertions(+), 263 deletions(-)
[flink-kubernetes-operator] 02/02: [FLINK-26141] Add e2e test to guard last state upgrade
This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit baaed88b21a27df0eb26a5bdce4516ec3c7510c1 Author: wangyang0918 AuthorDate: Thu Feb 24 16:32:05 2022 +0800 [FLINK-26141] Add e2e test to guard last state upgrade This closes #22. --- .github/workflows/ci.yml | 5 ++- e2e-tests/data/cr.yaml | 1 + e2e-tests/test_last_state_upgrade.sh | 68 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 343142a..6f3345a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -106,7 +106,10 @@ jobs: kubectl get pods - name: Run Flink e2e tests run: | - ./e2e-tests/test_kubernetes_application_ha.sh + ls e2e-tests/test_*.sh | while read script_test;do \ +echo "Running $script_test" +bash $script_test || exit 1 + done - name: Stop the operator run: | helm uninstall flink-operator diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml index 936a1de..90069f9 100644 --- a/e2e-tests/data/cr.yaml +++ b/e2e-tests/data/cr.yaml @@ -78,6 +78,7 @@ spec: jarURI: local:///opt/flink/usrlib/myjob.jar entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample parallelism: 2 +upgradeMode: last-state --- apiVersion: v1 diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh new file mode 100755 index 000..20ca7d0 --- /dev/null +++ b/e2e-tests/test_last_state_upgrade.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/utils.sh + +CLUSTER_ID="flink-example-statemachine" +TIMEOUT=300 + +function cleanup_and_exit() { +if [ $TRAPPED_EXIT_CODE != 0 ];then + debug_and_show_logs +fi + +kubectl delete -f e2e-tests/data/cr.yaml +kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" +kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" +} + +function wait_for_jobmanager_running() { +retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 + +kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 +jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') + +echo "Waiting for jobmanager pod ${jm_pod_name} ready." +kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 + +wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 +} + +on_exit cleanup_and_exit + +retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1 + +wait_for_jobmanager_running + +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 + +job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') + +# Update the FlinkDeployment and trigger the last state upgrade +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"jobManager": {"resource": {"cpu": 0.51, "memory": "1024m"} } } }' + +kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" +wait_for_jobmanager_running + +# Check the new JobManager recovering from latest successful checkpoint +wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1 +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 + +echo "Successfully run the last-state upgrade test" +
[flink-kubernetes-operator] branch main updated (85d997e -> baaed88)
This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git. from 85d997e [FLINK-26136] Extract shared deployment validation logic new 2e6e978 [FLINK-26141] Support last-state upgrade mode new baaed88 [FLINK-26141] Add e2e test to guard last state upgrade The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/ci.yml | 5 +- e2e-tests/data/cr.yaml | 1 + ...pplication_ha.sh => test_last_state_upgrade.sh} | 30 ++-- .../operator/reconciler/JobReconciler.java | 57 +++ .../kubernetes/operator/service/FlinkService.java | 15 +- .../kubernetes/operator/utils/FlinkUtils.java | 12 +- .../kubernetes/operator/TestingClusterClient.java | 149 ++ .../operator/reconciler/JobReconcilerTest.java | 80 -- .../operator/service/FlinkServiceTest.java | 175 + 9 files changed, 467 insertions(+), 57 deletions(-) copy e2e-tests/{test_kubernetes_application_ha.sh => test_last_state_upgrade.sh} (66%) create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
[flink-kubernetes-operator] 01/02: [FLINK-26141] Support last-state upgrade mode
This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git commit 2e6e978abd499d1b534e069b1d296f33d4300be9 Author: wangyang0918 AuthorDate: Thu Feb 24 15:47:41 2022 +0800 [FLINK-26141] Support last-state upgrade mode --- .../operator/reconciler/JobReconciler.java | 57 +++ .../kubernetes/operator/service/FlinkService.java | 15 +- .../kubernetes/operator/utils/FlinkUtils.java | 12 +- .../kubernetes/operator/TestingClusterClient.java | 149 ++ .../operator/reconciler/JobReconcilerTest.java | 80 -- .../operator/service/FlinkServiceTest.java | 175 + 6 files changed, 444 insertions(+), 44 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index dee22fb..fb599e8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; -import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; @@ -80,23 +79,21 @@ public class JobReconciler { upgradeFlinkJob(flinkApp, effectiveConfig); } if (desiredJobState.equals(JobState.SUSPENDED)) { -if (upgradeMode == UpgradeMode.STATELESS) { -cancelJob(flinkApp, effectiveConfig); -} else { -suspendJob(flinkApp, effectiveConfig); -} +printCancelLogs(upgradeMode, flinkApp.getMetadata().getName()); +cancelJob(flinkApp, upgradeMode, effectiveConfig); } } -if (currentJobState == JobState.SUSPENDED) { -if (desiredJobState == JobState.RUNNING) { -if (upgradeMode == UpgradeMode.STATELESS) { -deployFlinkJob(flinkApp, effectiveConfig, Optional.empty()); -} else if (upgradeMode == UpgradeMode.SAVEPOINT) { -restoreFromLastSavepoint(flinkApp, effectiveConfig); -} else { -throw new InvalidDeploymentException( -"Only savepoint and stateless strategies are supported at the moment."); -} +if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) { +if (upgradeMode == UpgradeMode.STATELESS) { +deployFlinkJob(flinkApp, effectiveConfig, Optional.empty()); +} else if (upgradeMode == UpgradeMode.SAVEPOINT) { +restoreFromLastSavepoint(flinkApp, effectiveConfig); +} else if (upgradeMode == UpgradeMode.LAST_STATE) { +final String savepointLocation = + flinkApp.getStatus().getJobStatus().getSavepointLocation(); +// Upgrade mode changes from savepoint -> last-state +deployFlinkJob( +flinkApp, effectiveConfig, Optional.ofNullable(savepointLocation)); } } } @@ -116,7 +113,8 @@ public class JobReconciler { private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception { LOG.info("Upgrading running job"); -Optional savepoint = cancelJob(flinkApp, effectiveConfig); +final Optional savepoint = +cancelJob(flinkApp, flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig); deployFlinkJob(flinkApp, effectiveConfig, savepoint); } @@ -126,17 +124,20 @@ public class JobReconciler { deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation())); } -private Optional suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig) -throws Exception { -LOG.info("Suspending {}", flinkApp.getMetadata().getName()); -return cancelJob(flinkApp, UpgradeMode.SAVEPOINT, effectiveConfig); -} - -private Optional cancelJob(FlinkDeployment