[flink] branch master updated: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e85cf8c4cdf [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API e85cf8c4cdf is described below commit e85cf8c4cdf417b47f8d53bf3bb202f79e92b205 Author: lincoln lee AuthorDate: Fri Apr 29 14:56:23 2022 +0800 [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API This closes #19983. --- .../docs/dev/datastream/operators/asyncio.md | 59 ++- .../docs/dev/datastream/operators/asyncio.md | 72 +++- .../streaming/api/datastream/AsyncDataStream.java | 176 - .../api/functions/async/AsyncRetryPredicate.java | 47 +++ .../api/functions/async/AsyncRetryStrategy.java| 37 ++ .../api/operators/async/AsyncWaitOperator.java | 227 +++- .../operators/async/AsyncWaitOperatorFactory.java | 15 + .../util/retryable/AsyncRetryStrategies.java | 240 + .../streaming/util/retryable/RetryPredicates.java | 85 + .../api/operators/async/AsyncWaitOperatorTest.java | 240 - .../streaming/api/scala/AsyncDataStream.scala | 397 - .../api/scala/async/AsyncRetryPredicate.scala | 47 +++ .../api/scala/async/AsyncRetryStrategy.scala | 34 ++ .../api/scala/AsyncDataStreamITCase.scala | 122 ++- 14 files changed, 1755 insertions(+), 43 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md b/docs/content.zh/docs/dev/datastream/operators/asyncio.md index d1054879bef..6dbddab5824 100644 --- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md +++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md @@ -30,6 +30,8 @@ under the License. 对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。 提示:这篇文档 [FLIP-12: 异步 I/O 的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步 I/O 功能的细节。 +对于新增的重试支持的实现细节可以参考[FLIP-232: 为 DataStream API 异步 I/O 操作增加重试支持](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)。 + ## 对于异步 I/O 操作的需求 @@ -60,7 +62,7 @@ Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端 - 实现分发请求的 `AsyncFunction` - 获取数据库交互的结果并发送给 `ResultFuture` 的 *回调* 函数 -- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作。 +- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作, 启用或者不启用重试。 下面是基本的代码模板: @@ -115,10 +117,21 @@ class AsyncDatabaseRequest extends RichAsyncFunction stream = ...; -// 应用异步 I/O 转换操作 +// 应用异步 I/O 转换操作,不启用重试 DataStream> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100); +// 或 应用异步 I/O 转换操作并启用重试 +// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略 +AsyncRetryStrategy asyncRetryStrategy = + new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms + .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE) + .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE) + .build(); + +// 应用异步 I/O 转换操作并启用重试 +DataStream> resultStream = + AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -151,10 +164,17 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { // 创建初始 DataStream val stream: DataStream[String] = ... -// 应用异步 I/O 转换操作 +// 应用异步 I/O 转换操作,不启用重试 val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100) +// 或 应用异步 I/O 转换操作并启用重试 +// 创建一个异步重试策略 +val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ... + +// 应用异步 I/O 转换操作并启用重试 +val resultStream: DataStream[(String, String)] = + AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy) ``` {{< /tab >}} {{< /tabs >}} @@ -164,11 +184,12 @@ val resultStream: DataStream[(String, String)] = 下面两个参数控制异步操作: - - **Timeout**: 超时参数定义了异步请求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的请求。 + - **Timeout**: 超时参数定义了异步操作执行多久未完成、最终认定为失败的时长,如果启用重试,则可能包括多个重试请求。 它可以防止一直等待得不到响应的请求。 - **Capacity**: 容量参数定义了可以同时进行的异步请求数。 即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。 + - **AsyncRetryStrategy**: 重试策略参数定义了什么条件会触发延迟重试以及延迟的策略,例如,固定延迟、指数后退延迟、自定义实现等。 ### 超时处理 @@ -211,6 +232,16 @@ Flink 提供两种模式控制结果记录以何种顺序发出。 异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。 +### 重试支持 + +重试支持为异步 I/O 操作引入了一个内置重试机制,它对用户的异步函数实现逻辑是透明的。 + + - **AsyncRetryStrategy**: 异步重试策略包含了触发重试条件 `AsyncRetryPredicate` 定义,以及根据当前已尝试次数判断是否继续重试、下次重试间隔时长的接口方法。 + 需要注意,在满足触发重试条件后,有可能因为当前重试次数超过预设的上限放弃重试,或是在任务结束时被强制终止重试(这种情况下,系统以最后一次执行的结果或异常作为最终状态)。 + + -
[flink-statefun-playground] branch dependabot/maven/java/connected-components/io.undertow-undertow-core-2.2.15.Final created (now d9bcf04)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/java/connected-components/io.undertow-undertow-core-2.2.15.Final in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git at d9bcf04 Bump undertow-core in /java/connected-components No new revisions were added by this update.
[flink-statefun-playground] branch dependabot/maven/java/showcase/io.undertow-undertow-core-2.2.15.Final created (now 4fcdaae)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/java/showcase/io.undertow-undertow-core-2.2.15.Final in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git at 4fcdaae Bump undertow-core from 1.4.18.Final to 2.2.15.Final in /java/showcase No new revisions were added by this update.
[flink-statefun-playground] branch dependabot/maven/java/greeter/io.undertow-undertow-core-2.2.15.Final created (now 5c9ba7f)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/java/greeter/io.undertow-undertow-core-2.2.15.Final in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git at 5c9ba7f Bump undertow-core from 1.4.18.Final to 2.2.15.Final in /java/greeter No new revisions were added by this update.
[flink-statefun-playground] branch dependabot/maven/java/shopping-cart/io.undertow-undertow-core-2.2.15.Final created (now 78e0bcb)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/java/shopping-cart/io.undertow-undertow-core-2.2.15.Final in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git at 78e0bcb Bump undertow-core in /java/shopping-cart No new revisions were added by this update.
[flink] branch dependabot/maven/flink-connectors/flink-connector-kinesis/com.amazonaws-aws-java-sdk-s3-1.12.261 created (now eda1b8daf6f)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/flink-connectors/flink-connector-kinesis/com.amazonaws-aws-java-sdk-s3-1.12.261 in repository https://gitbox.apache.org/repos/asf/flink.git at eda1b8daf6f Bump aws-java-sdk-s3 in /flink-connectors/flink-connector-kinesis No new revisions were added by this update.
[flink] branch dependabot/maven/flink-filesystems/flink-s3-fs-base/com.amazonaws-aws-java-sdk-s3-1.12.261 created (now 801141d5cd7)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/flink-filesystems/flink-s3-fs-base/com.amazonaws-aws-java-sdk-s3-1.12.261 in repository https://gitbox.apache.org/repos/asf/flink.git at 801141d5cd7 Bump aws-java-sdk-s3 in /flink-filesystems/flink-s3-fs-base No new revisions were added by this update.
[flink] branch dependabot/maven/flink-yarn/com.amazonaws-aws-java-sdk-s3-1.12.261 created (now 2edaa8a6969)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/flink-yarn/com.amazonaws-aws-java-sdk-s3-1.12.261 in repository https://gitbox.apache.org/repos/asf/flink.git at 2edaa8a6969 Bump aws-java-sdk-s3 from 1.11.171 to 1.12.261 in /flink-yarn No new revisions were added by this update.
[flink] branch master updated: [hotfix][docs-zh]Fix Chinese document format errors.
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 23814be4f91 [hotfix][docs-zh]Fix Chinese document format errors. 23814be4f91 is described below commit 23814be4f916b064be082a5382d9c5c5fab10ff3 Author: liuzhuang2017 AuthorDate: Tue Jul 12 17:00:59 2022 +0800 [hotfix][docs-zh]Fix Chinese document format errors. --- docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md | 4 ++-- docs/content.zh/docs/ops/state/large_state_tuning.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md b/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md index 15a9be9ff12..3383f21d703 100644 --- a/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md +++ b/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md @@ -124,8 +124,8 @@ Checkpoint。此外,Savepoint 也不能与非对齐 Checkpoint 同时发生, 与 Watermark 的相互影响 非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证。目前,Flink 确保了 Watermark 作为恢复的第一步, -而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,** Flink 会在恢复 -In-flight 数据后再生成 Watermark **。如果您的 Pipeline 中使用了**对每条记录都应用最新的 Watermark 的算子**将会相对于 +而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,**Flink 会在恢复 +In-flight 数据后再生成 Watermark**。如果您的 Pipeline 中使用了**对每条记录都应用最新的 Watermark 的算子**将会相对于 使用对齐 Checkpoint产生**不同的结果**。如果您的 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中。在这种情况下,Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。 diff --git a/docs/content.zh/docs/ops/state/large_state_tuning.md b/docs/content.zh/docs/ops/state/large_state_tuning.md index 53de77b47a2..a7499ee12ca 100644 --- a/docs/content.zh/docs/ops/state/large_state_tuning.md +++ b/docs/content.zh/docs/ops/state/large_state_tuning.md @@ -173,7 +173,7 @@ Flink 的设计力求使最大并行度的值达到很高的效率,即使执 ## 压缩 -Flink 为所有 checkpoints 和 savepoints 提供可选的压缩(默认:关闭)。 目前,压缩总是使用 [snappy 压缩算法(版本 1.1.4)](https://github.com/xerial/snappy-java), +Flink 为所有 checkpoints 和 savepoints 提供可选的压缩(默认:关闭)。 目前,压缩总是使用 [snappy 压缩算法(版本 1.1.4)](https://github.com/xerial/snappy-java), 但我们计划在未来支持自定义压缩算法。 压缩作用于 keyed state 下 key-groups 的粒度,即每个 key-groups 可以单独解压缩,这对于重新缩放很重要。 可以通过 `ExecutionConfig` 开启压缩:
[flink] branch master updated (c27fd8dc72c -> 3e7aa6ef4b3)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from c27fd8dc72c [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions add 3e7aa6ef4b3 [hotfix][docs-zh] Add missing the working_directory.md file to the standalone part. No new revisions were added by this update. Summary of changes: .../docs/deployment/resource-providers/standalone/working_directory.md| 0 1 file changed, 0 insertions(+), 0 deletions(-) copy docs/{content => content.zh}/docs/deployment/resource-providers/standalone/working_directory.md (100%)
[flink] branch master updated: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c27fd8dc72c [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions c27fd8dc72c is described below commit c27fd8dc72ceac7631b5f7482db1e9a14b339f68 Author: lincoln lee AuthorDate: Mon May 16 12:30:38 2022 +0800 [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions This closes #19759 --- .../generated/execution_config_configuration.html | 6 +++ .../table/api/config/ExecutionConfigOptions.java | 25 .../plan/nodes/exec/batch/BatchExecLookupJoin.java | 1 + .../nodes/exec/common/CommonExecLookupJoin.java| 22 -- .../nodes/exec/stream/StreamExecLookupJoin.java| 4 ++ .../physical/stream/StreamPhysicalLookupJoin.scala | 3 +- .../factories/TestValuesRuntimeFunctions.java | 11 - .../testJoinTemporalTable.out | 1 + ...testJoinTemporalTableWithProjectionPushDown.out | 1 + .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 20 ++--- .../operators/join/AsyncLookupJoinHarnessTest.java | 47 ++ 11 files changed, 123 insertions(+), 18 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 364d9bf9d81..d2974df5ac6 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -14,6 +14,12 @@ Integer The max number of async i/o operation that the async lookup join can trigger. + +table.exec.async-lookup.output-mode Batch Streaming +ORDERED +Enum +Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.Possible values:"ORDERED""ALLOW_UNORDERED" + table.exec.async-lookup.timeout Batch Streaming 3 min diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 9aa8642dee4..474fc4f9c24 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -323,6 +323,16 @@ public class ExecutionConfigOptions { .withDescription( "The async timeout for the asynchronous operation to complete."); +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) +public static final ConfigOption TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE = +key("table.exec.async-lookup.output-mode") +.enumType(AsyncOutputMode.class) +.defaultValue(AsyncOutputMode.ORDERED) +.withDescription( +"Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. " ++ "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not " ++ "affect the correctness of the result, otherwise ORDERED will be still used."); + // // MiniBatch Options // @@ -573,6 +583,21 @@ public class ExecutionConfigOptions { FORCE } +/** Output mode for asynchronous operations, equivalent to {@see AsyncDataStream.OutputMode}. */ +@PublicEvolving +public enum AsyncOutputMode { + +/** Ordered output mode, equivalent to {@see AsyncDataStream.OutputMode.ORDERED}. */ +ORDERED, + +/** + * Allow unordered output mode, will attempt to use {@see + * AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the + * result, otherwise ORDERED will be still used. + */ +ALLOW_UNORDERED +} + /** Determine if CAST operates using the legacy behaviour or the new one. */ @Deprecated public enum LegacyCastBehaviour implements DescribedEnum { diff --git
[flink-table-store] branch master updated: [FLINK-28560] Support Spark 3.3 profile for SparkSource
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 15ddb221 [FLINK-28560] Support Spark 3.3 profile for SparkSource 15ddb221 is described below commit 15ddb22174fb94bd36b176f0eb487f5a1b39d443 Author: Nicholas Jiang AuthorDate: Fri Jul 15 15:02:59 2022 +0800 [FLINK-28560] Support Spark 3.3 profile for SparkSource This closes #217 --- docs/content/docs/engines/overview.md | 1 + flink-table-store-spark/pom.xml| 6 .../flink/table/store/spark/SparkCatalog.java | 28 +- .../flink/table/store/spark/SparkTypeTest.java | 34 +++--- 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/docs/content/docs/engines/overview.md b/docs/content/docs/engines/overview.md index dc4a3f1c..0bbc14ad 100644 --- a/docs/content/docs/engines/overview.md +++ b/docs/content/docs/engines/overview.md @@ -41,5 +41,6 @@ Apache Hive and Apache Spark. | Spark | 3.0 | read | Projection, Filter | | Spark | 3.1 | read | Projection, Filter | | Spark | 3.2 | read | Projection, Filter | +| Spark | 3.3 | read | Projection, Filter | | Trino | 358 | read | Projection, Filter | | Trino | 388 | read | Projection, Filter | \ No newline at end of file diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml index 66609af6..1a29f56a 100644 --- a/flink-table-store-spark/pom.xml +++ b/flink-table-store-spark/pom.xml @@ -76,6 +76,12 @@ under the License. + +spark-3.3 + +3.3.0 + + spark-3.2 diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java index b53cb33f..65f1387b 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java @@ -185,8 +185,34 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces { throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet."); } -@Override +/** + * Drop a namespace from the catalog, recursively dropping all objects within the namespace. + * This interface implementation only supports the Spark 3.0, 3.1 and 3.2. + * + * If the catalog implementation does not support this operation, it may throw {@link + * UnsupportedOperationException}. + * + * @param namespace a multi-part namespace + * @return true if the namespace was dropped + * @throws UnsupportedOperationException If drop is not a supported operation + */ public boolean dropNamespace(String[] namespace) { +return dropNamespace(namespace, true); +} + +/** + * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within + * the namespace if cascade is true. This interface implementation supports the Spark 3.3+. + * + * If the catalog implementation does not support this operation, it may throw {@link + * UnsupportedOperationException}. + * + * @param namespace a multi-part namespace + * @param cascade When true, deletes all objects under the namespace + * @return true if the namespace was dropped + * @throws UnsupportedOperationException If drop is not a supported operation + */ +public boolean dropNamespace(String[] namespace, boolean cascade) { throw new UnsupportedOperationException("Drop namespace in Spark is not supported yet."); } diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java index abae4e2d..8ccea4f8 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java @@ -73,29 +73,29 @@ public class SparkTypeTest { String nestedRowMapType = "StructField(locations,MapType(" + "StringType," -+ "StructType(StructField(posX,DoubleType,false),
[flink-kubernetes-operator] branch main updated: [FLINK-28478] Always complete in-progress update
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new a643037 [FLINK-28478] Always complete in-progress update a643037 is described below commit a6430378553d7405a037f948576bea5ef5be55ce Author: Gyula Fora AuthorDate: Thu Jul 14 14:00:33 2022 +0200 [FLINK-28478] Always complete in-progress update --- .../AbstractFlinkResourceReconciler.java | 4 ++ .../kubernetes/operator/TestingFlinkService.java | 9 ++- .../deployment/SessionReconcilerTest.java | 67 -- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index b0d8abe..7758ac6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -270,6 +270,10 @@ public abstract class AbstractFlinkResourceReconciler< * @return True if desired spec was already deployed. */ private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration deployConf) { +if (resource.getStatus().getReconciliationStatus().getState() +== ReconciliationState.UPGRADING) { +return false; +} AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(resource); if (resource.getSpec().equals(deployedSpec)) { LOG.info( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 439b3ec..64a0b6d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -144,6 +144,10 @@ public class TestingFlinkService extends FlinkService { sessions.clear(); } +public Set getSessions() { +return sessions; +} + @Override public void submitApplicationCluster( JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { @@ -186,7 +190,10 @@ public class TestingFlinkService extends FlinkService { } @Override -public void submitSessionCluster(Configuration conf) { +public void submitSessionCluster(Configuration conf) throws Exception { +if (deployFailure) { +throw new Exception("Deployment failure"); +} sessions.add(conf.get(KubernetesConfigOptions.CLUSTER_ID)); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java index 2e13d28..2eb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java @@ -23,16 +23,20 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.TestingStatusRecorder; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; -import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests for {@link org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler}. @@ -41,17 +45,21 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class SessionReconcilerTest { private final FlinkConfigManager configManager = new
[flink] branch master updated: [FLINK-27991][table-planner] ORC format supports reporting statistics
This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new a521e82a473 [FLINK-27991][table-planner] ORC format supports reporting statistics a521e82a473 is described below commit a521e82a47384ad88e2424f9f6734f0c6d1f9b14 Author: zhengyunhong.zyh <337361...@qq.com> AuthorDate: Wed Jul 13 14:59:20 2022 +0800 [FLINK-27991][table-planner] ORC format supports reporting statistics This closes #20009 --- flink-formats/flink-orc/pom.xml| 8 + .../org/apache/flink/orc/OrcFileFormatFactory.java | 202 +- .../orc/OrcFileSystemStatisticsReportTest.java | 51 + .../flink/orc/OrcFormatStatisticsReportTest.java | 236 + .../planner/utils/StatisticsReportTestBase.java| 4 +- 5 files changed, 497 insertions(+), 4 deletions(-) diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index 050ce79cab4..6aa5cceb4f7 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -109,6 +109,14 @@ under the License. + + + com.google.guava + guava + ${guava.version} + test + + org.apache.flink flink-test-utils diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java index 1e0bf4249d5..02057fd66be 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.orc; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; @@ -26,12 +27,14 @@ import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory; import org.apache.flink.connector.file.table.format.BulkDecodingFormat; +import org.apache.flink.core.fs.Path; import org.apache.flink.orc.shim.OrcShim; import org.apache.flink.orc.vector.RowDataVectorizer; import org.apache.flink.orc.writer.OrcBulkWriterFactory; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat; import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -40,17 +43,34 @@ import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.plan.stats.ColumnStats; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.DateColumnStatistics; +import org.apache.orc.DecimalColumnStatistics; +import org.apache.orc.DoubleColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.StringColumnStatistics; +import org.apache.orc.TimestampColumnStatistics; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.ColumnStatisticsImpl; +import java.io.IOException; +import java.sql.Date; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -115,9 +135,12 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter }; } -private static class OrcBulkDecodingFormat +/** OrcBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}. */ +@VisibleForTesting +public static class OrcBulkDecodingFormat implements BulkDecodingFormat, -