[flink] branch master updated: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

2022-07-15 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e85cf8c4cdf [FLINK-27878][datastream] Add Retry Support For Async I/O 
In DataStream API
e85cf8c4cdf is described below

commit e85cf8c4cdf417b47f8d53bf3bb202f79e92b205
Author: lincoln lee 
AuthorDate: Fri Apr 29 14:56:23 2022 +0800

[FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

This closes #19983.
---
 .../docs/dev/datastream/operators/asyncio.md   |  59 ++-
 .../docs/dev/datastream/operators/asyncio.md   |  72 +++-
 .../streaming/api/datastream/AsyncDataStream.java  | 176 -
 .../api/functions/async/AsyncRetryPredicate.java   |  47 +++
 .../api/functions/async/AsyncRetryStrategy.java|  37 ++
 .../api/operators/async/AsyncWaitOperator.java | 227 +++-
 .../operators/async/AsyncWaitOperatorFactory.java  |  15 +
 .../util/retryable/AsyncRetryStrategies.java   | 240 +
 .../streaming/util/retryable/RetryPredicates.java  |  85 +
 .../api/operators/async/AsyncWaitOperatorTest.java | 240 -
 .../streaming/api/scala/AsyncDataStream.scala  | 397 -
 .../api/scala/async/AsyncRetryPredicate.scala  |  47 +++
 .../api/scala/async/AsyncRetryStrategy.scala   |  34 ++
 .../api/scala/AsyncDataStreamITCase.scala  | 122 ++-
 14 files changed, 1755 insertions(+), 43 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md 
b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
index d1054879bef..6dbddab5824 100644
--- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
@@ -30,6 +30,8 @@ under the License.
 对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
 
 提示:这篇文档 [FLIP-12: 异步 I/O 
的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步
 I/O 功能的细节。
+对于新增的重试支持的实现细节可以参考[FLIP-232: 为 DataStream API 异步 I/O 
操作增加重试支持](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)。
+
 
 ## 对于异步 I/O 操作的需求
 
@@ -60,7 +62,7 @@ Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端
 
 - 实现分发请求的 `AsyncFunction`
 - 获取数据库交互的结果并发送给 `ResultFuture` 的 *回调* 函数
-- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作。
+- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作, 启用或者不启用重试。
 
 下面是基本的代码模板:
 
@@ -115,10 +117,21 @@ class AsyncDatabaseRequest extends 
RichAsyncFunction stream = ...;
 
-// 应用异步 I/O 转换操作
+// 应用异步 I/O 转换操作,不启用重试
 DataStream> resultStream =
 AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);
 
+// 或 应用异步 I/O 转换操作并启用重试
+// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
+AsyncRetryStrategy asyncRetryStrategy =
+   new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // 
maxAttempts=3, fixedDelay=100ms
+   .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+   .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+   .build();
+
+// 应用异步 I/O 转换操作并启用重试
+DataStream> resultStream =
+   AsyncDataStream.unorderedWaitWithRetry(stream, new 
AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -151,10 +164,17 @@ class AsyncDatabaseRequest extends AsyncFunction[String, 
(String, String)] {
 // 创建初始 DataStream
 val stream: DataStream[String] = ...
 
-// 应用异步 I/O 转换操作
+// 应用异步 I/O 转换操作,不启用重试
 val resultStream: DataStream[(String, String)] =
 AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100)
 
+// 或 应用异步 I/O 转换操作并启用重试
+// 创建一个异步重试策略
+val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+
+// 应用异步 I/O 转换操作并启用重试
+val resultStream: DataStream[(String, String)] =
+  AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 
1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -164,11 +184,12 @@ val resultStream: DataStream[(String, String)] =
 
 下面两个参数控制异步操作:
 
-  - **Timeout**: 超时参数定义了异步请求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的请求。
+  - **Timeout**: 超时参数定义了异步操作执行多久未完成、最终认定为失败的时长,如果启用重试,则可能包括多个重试请求。 
它可以防止一直等待得不到响应的请求。
 
   - **Capacity**: 容量参数定义了可以同时进行的异步请求数。
 即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 
限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。
 
+  - **AsyncRetryStrategy**: 重试策略参数定义了什么条件会触发延迟重试以及延迟的策略,例如,固定延迟、指数后退延迟、自定义实现等。
 
 ### 超时处理
 
@@ -211,6 +232,16 @@ Flink 提供两种模式控制结果记录以何种顺序发出。
 异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。
 
 
+### 重试支持
+
+重试支持为异步 I/O 操作引入了一个内置重试机制,它对用户的异步函数实现逻辑是透明的。
+
+  - **AsyncRetryStrategy**: 异步重试策略包含了触发重试条件 `AsyncRetryPredicate` 
定义,以及根据当前已尝试次数判断是否继续重试、下次重试间隔时长的接口方法。
+
需要注意,在满足触发重试条件后,有可能因为当前重试次数超过预设的上限放弃重试,或是在任务结束时被强制终止重试(这种情况下,系统以最后一次执行的结果或异常作为最终状态)。
+
+  - 

[flink-statefun-playground] branch dependabot/maven/java/connected-components/io.undertow-undertow-core-2.2.15.Final created (now d9bcf04)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/java/connected-components/io.undertow-undertow-core-2.2.15.Final
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


  at d9bcf04  Bump undertow-core in /java/connected-components

No new revisions were added by this update.



[flink-statefun-playground] branch dependabot/maven/java/showcase/io.undertow-undertow-core-2.2.15.Final created (now 4fcdaae)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/java/showcase/io.undertow-undertow-core-2.2.15.Final
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


  at 4fcdaae  Bump undertow-core from 1.4.18.Final to 2.2.15.Final in 
/java/showcase

No new revisions were added by this update.



[flink-statefun-playground] branch dependabot/maven/java/greeter/io.undertow-undertow-core-2.2.15.Final created (now 5c9ba7f)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/java/greeter/io.undertow-undertow-core-2.2.15.Final
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


  at 5c9ba7f  Bump undertow-core from 1.4.18.Final to 2.2.15.Final in 
/java/greeter

No new revisions were added by this update.



[flink-statefun-playground] branch dependabot/maven/java/shopping-cart/io.undertow-undertow-core-2.2.15.Final created (now 78e0bcb)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/java/shopping-cart/io.undertow-undertow-core-2.2.15.Final
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git


  at 78e0bcb  Bump undertow-core in /java/shopping-cart

No new revisions were added by this update.



[flink] branch dependabot/maven/flink-connectors/flink-connector-kinesis/com.amazonaws-aws-java-sdk-s3-1.12.261 created (now eda1b8daf6f)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/flink-connectors/flink-connector-kinesis/com.amazonaws-aws-java-sdk-s3-1.12.261
in repository https://gitbox.apache.org/repos/asf/flink.git


  at eda1b8daf6f Bump aws-java-sdk-s3 in 
/flink-connectors/flink-connector-kinesis

No new revisions were added by this update.



[flink] branch dependabot/maven/flink-filesystems/flink-s3-fs-base/com.amazonaws-aws-java-sdk-s3-1.12.261 created (now 801141d5cd7)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/flink-filesystems/flink-s3-fs-base/com.amazonaws-aws-java-sdk-s3-1.12.261
in repository https://gitbox.apache.org/repos/asf/flink.git


  at 801141d5cd7 Bump aws-java-sdk-s3 in /flink-filesystems/flink-s3-fs-base

No new revisions were added by this update.



[flink] branch dependabot/maven/flink-yarn/com.amazonaws-aws-java-sdk-s3-1.12.261 created (now 2edaa8a6969)

2022-07-15 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/flink-yarn/com.amazonaws-aws-java-sdk-s3-1.12.261
in repository https://gitbox.apache.org/repos/asf/flink.git


  at 2edaa8a6969 Bump aws-java-sdk-s3 from 1.11.171 to 1.12.261 in 
/flink-yarn

No new revisions were added by this update.



[flink] branch master updated: [hotfix][docs-zh]Fix Chinese document format errors.

2022-07-15 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 23814be4f91 [hotfix][docs-zh]Fix Chinese document format errors.
23814be4f91 is described below

commit 23814be4f916b064be082a5382d9c5c5fab10ff3
Author: liuzhuang2017 
AuthorDate: Tue Jul 12 17:00:59 2022 +0800

[hotfix][docs-zh]Fix Chinese document format errors.
---
 docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md | 4 ++--
 docs/content.zh/docs/ops/state/large_state_tuning.md   | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md 
b/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md
index 15a9be9ff12..3383f21d703 100644
--- a/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md
+++ b/docs/content.zh/docs/ops/state/checkpointing_under_backpressure.md
@@ -124,8 +124,8 @@ Checkpoint。此外,Savepoint 也不能与非对齐 Checkpoint 同时发生,
  与 Watermark 的相互影响
 
 非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证。目前,Flink 确保了 Watermark 作为恢复的第一步,
-而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,** Flink 
会在恢复
-In-flight 数据后再生成 Watermark **。如果您的 Pipeline 中使用了**对每条记录都应用最新的 Watermark 
的算子**将会相对于
+而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,**Flink 会在恢复
+In-flight 数据后再生成 Watermark**。如果您的 Pipeline 中使用了**对每条记录都应用最新的 Watermark 
的算子**将会相对于
 使用对齐 Checkpoint产生**不同的结果**。如果您的 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 
Watermark 
 存放在 OperatorState 中。在这种情况下,Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。
 
diff --git a/docs/content.zh/docs/ops/state/large_state_tuning.md 
b/docs/content.zh/docs/ops/state/large_state_tuning.md
index 53de77b47a2..a7499ee12ca 100644
--- a/docs/content.zh/docs/ops/state/large_state_tuning.md
+++ b/docs/content.zh/docs/ops/state/large_state_tuning.md
@@ -173,7 +173,7 @@ Flink 的设计力求使最大并行度的值达到很高的效率,即使执
 
 ## 压缩
 
-Flink 为所有 checkpoints 和 savepoints 提供可选的压缩(默认:关闭)。 目前,压缩总是使用 [snappy 压缩算法(版本 
1.1.4)](https://github.com/xerial/snappy-java),
+Flink 为所有 checkpoints 和 savepoints 提供可选的压缩(默认:关闭)。 目前,压缩总是使用 [snappy 压缩算法(版本 
1.1.4)](https://github.com/xerial/snappy-java),
 但我们计划在未来支持自定义压缩算法。 压缩作用于 keyed state 下 key-groups 的粒度,即每个 key-groups 
可以单独解压缩,这对于重新缩放很重要。
 
 可以通过 `ExecutionConfig` 开启压缩:



[flink] branch master updated (c27fd8dc72c -> 3e7aa6ef4b3)

2022-07-15 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from c27fd8dc72c [FLINK-27623][table] Add 
'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
 add 3e7aa6ef4b3 [hotfix][docs-zh] Add missing the working_directory.md 
file to the standalone part.

No new revisions were added by this update.

Summary of changes:
 .../docs/deployment/resource-providers/standalone/working_directory.md| 0
 1 file changed, 0 insertions(+), 0 deletions(-)
 copy docs/{content => 
content.zh}/docs/deployment/resource-providers/standalone/working_directory.md 
(100%)



[flink] branch master updated: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

2022-07-15 Thread godfrey
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c27fd8dc72c [FLINK-27623][table] Add 
'table.exec.async-lookup.output-mode' to ExecutionConfigOptions
c27fd8dc72c is described below

commit c27fd8dc72ceac7631b5f7482db1e9a14b339f68
Author: lincoln lee 
AuthorDate: Mon May 16 12:30:38 2022 +0800

[FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to 
ExecutionConfigOptions

This closes #19759
---
 .../generated/execution_config_configuration.html  |  6 +++
 .../table/api/config/ExecutionConfigOptions.java   | 25 
 .../plan/nodes/exec/batch/BatchExecLookupJoin.java |  1 +
 .../nodes/exec/common/CommonExecLookupJoin.java| 22 --
 .../nodes/exec/stream/StreamExecLookupJoin.java|  4 ++
 .../physical/stream/StreamPhysicalLookupJoin.scala |  3 +-
 .../factories/TestValuesRuntimeFunctions.java  | 11 -
 .../testJoinTemporalTable.out  |  1 +
 ...testJoinTemporalTableWithProjectionPushDown.out |  1 +
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 20 ++---
 .../operators/join/AsyncLookupJoinHarnessTest.java | 47 ++
 11 files changed, 123 insertions(+), 18 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 364d9bf9d81..d2974df5ac6 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -14,6 +14,12 @@
 Integer
 The max number of async i/o operation that the async lookup 
join can trigger.
 
+
+table.exec.async-lookup.output-mode Batch Streaming
+ORDERED
+Enum
+Output mode for asynchronous operations which will convert to 
{@see AsyncDataStream.OutputMode}, ORDERED by default. If set to 
ALLOW_UNORDERED, will attempt to use {@see 
AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness 
of the result, otherwise ORDERED will be still used.Possible 
values:"ORDERED""ALLOW_UNORDERED"
+
 
 table.exec.async-lookup.timeout Batch Streaming
 3 min
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 9aa8642dee4..474fc4f9c24 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -323,6 +323,16 @@ public class ExecutionConfigOptions {
 .withDescription(
 "The async timeout for the asynchronous operation 
to complete.");
 
+@Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+public static final ConfigOption 
TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE =
+key("table.exec.async-lookup.output-mode")
+.enumType(AsyncOutputMode.class)
+.defaultValue(AsyncOutputMode.ORDERED)
+.withDescription(
+"Output mode for asynchronous operations which 
will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. "
++ "If set to ALLOW_UNORDERED, will attempt 
to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
++ "affect the correctness of the result, 
otherwise ORDERED will be still used.");
+
 // 
 //  MiniBatch Options
 // 
@@ -573,6 +583,21 @@ public class ExecutionConfigOptions {
 FORCE
 }
 
+/** Output mode for asynchronous operations, equivalent to {@see 
AsyncDataStream.OutputMode}. */
+@PublicEvolving
+public enum AsyncOutputMode {
+
+/** Ordered output mode, equivalent to {@see 
AsyncDataStream.OutputMode.ORDERED}. */
+ORDERED,
+
+/**
+ * Allow unordered output mode, will attempt to use {@see
+ * AsyncDataStream.OutputMode.UNORDERED} when it does not affect the 
correctness of the
+ * result, otherwise ORDERED will be still used.
+ */
+ALLOW_UNORDERED
+}
+
 /** Determine if CAST operates using the legacy behaviour or the new one. 
*/
 @Deprecated
 public enum LegacyCastBehaviour implements DescribedEnum {
diff --git 

[flink-table-store] branch master updated: [FLINK-28560] Support Spark 3.3 profile for SparkSource

2022-07-15 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 15ddb221 [FLINK-28560] Support Spark 3.3 profile for SparkSource
15ddb221 is described below

commit 15ddb22174fb94bd36b176f0eb487f5a1b39d443
Author: Nicholas Jiang 
AuthorDate: Fri Jul 15 15:02:59 2022 +0800

[FLINK-28560] Support Spark 3.3 profile for SparkSource

This closes #217
---
 docs/content/docs/engines/overview.md  |  1 +
 flink-table-store-spark/pom.xml|  6 
 .../flink/table/store/spark/SparkCatalog.java  | 28 +-
 .../flink/table/store/spark/SparkTypeTest.java | 34 +++---
 4 files changed, 51 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/engines/overview.md 
b/docs/content/docs/engines/overview.md
index dc4a3f1c..0bbc14ad 100644
--- a/docs/content/docs/engines/overview.md
+++ b/docs/content/docs/engines/overview.md
@@ -41,5 +41,6 @@ Apache Hive and Apache Spark.
 | Spark | 3.0  | read 
| Projection, Filter |
 | Spark | 3.1  | read 
| Projection, Filter |
 | Spark | 3.2  | read 
| Projection, Filter |
+| Spark | 3.3  | read 
| Projection, Filter |
 | Trino | 358  | read 
| Projection, Filter |
 | Trino | 388  | read 
| Projection, Filter |
\ No newline at end of file
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 66609af6..1a29f56a 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -76,6 +76,12 @@ under the License.
 
 
 
+
+spark-3.3
+
+3.3.0
+
+
 
 spark-3.2
 
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index b53cb33f..65f1387b 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -185,8 +185,34 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
 throw new UnsupportedOperationException("Alter namespace in Spark is 
not supported yet.");
 }
 
-@Override
+/**
+ * Drop a namespace from the catalog, recursively dropping all objects 
within the namespace.
+ * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
+ *
+ * If the catalog implementation does not support this operation, it 
may throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @return true if the namespace was dropped
+ * @throws UnsupportedOperationException If drop is not a supported 
operation
+ */
 public boolean dropNamespace(String[] namespace) {
+return dropNamespace(namespace, true);
+}
+
+/**
+ * Drop a namespace from the catalog with cascade mode, recursively 
dropping all objects within
+ * the namespace if cascade is true. This interface implementation 
supports the Spark 3.3+.
+ *
+ * If the catalog implementation does not support this operation, it 
may throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
+ * @return true if the namespace was dropped
+ * @throws UnsupportedOperationException If drop is not a supported 
operation
+ */
+public boolean dropNamespace(String[] namespace, boolean cascade) {
 throw new UnsupportedOperationException("Drop namespace in Spark is 
not supported yet.");
 }
 
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index abae4e2d..8ccea4f8 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -73,29 +73,29 @@ public class SparkTypeTest {
 String nestedRowMapType =
 "StructField(locations,MapType("
 + "StringType,"
-+ "StructType(StructField(posX,DoubleType,false), 

[flink-kubernetes-operator] branch main updated: [FLINK-28478] Always complete in-progress update

2022-07-15 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new a643037  [FLINK-28478] Always complete in-progress update
a643037 is described below

commit a6430378553d7405a037f948576bea5ef5be55ce
Author: Gyula Fora 
AuthorDate: Thu Jul 14 14:00:33 2022 +0200

[FLINK-28478] Always complete in-progress update
---
 .../AbstractFlinkResourceReconciler.java   |  4 ++
 .../kubernetes/operator/TestingFlinkService.java   |  9 ++-
 .../deployment/SessionReconcilerTest.java  | 67 --
 3 files changed, 74 insertions(+), 6 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index b0d8abe..7758ac6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -270,6 +270,10 @@ public abstract class AbstractFlinkResourceReconciler<
  * @return True if desired spec was already deployed.
  */
 private boolean checkNewSpecAlreadyDeployed(CR resource, Configuration 
deployConf) {
+if (resource.getStatus().getReconciliationStatus().getState()
+== ReconciliationState.UPGRADING) {
+return false;
+}
 AbstractFlinkSpec deployedSpec = 
ReconciliationUtils.getDeployedSpec(resource);
 if (resource.getSpec().equals(deployedSpec)) {
 LOG.info(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 439b3ec..64a0b6d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -144,6 +144,10 @@ public class TestingFlinkService extends FlinkService {
 sessions.clear();
 }
 
+public Set getSessions() {
+return sessions;
+}
+
 @Override
 public void submitApplicationCluster(
 JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) 
throws Exception {
@@ -186,7 +190,10 @@ public class TestingFlinkService extends FlinkService {
 }
 
 @Override
-public void submitSessionCluster(Configuration conf) {
+public void submitSessionCluster(Configuration conf) throws Exception {
+if (deployFailure) {
+throw new Exception("Deployment failure");
+}
 sessions.add(conf.get(KubernetesConfigOptions.CLUSTER_ID));
 }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 2e13d28..2eb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -23,16 +23,20 @@ import 
org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Tests for {@link 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler}.
@@ -41,17 +45,21 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class SessionReconcilerTest {
 
 private final FlinkConfigManager configManager = new 

[flink] branch master updated: [FLINK-27991][table-planner] ORC format supports reporting statistics

2022-07-15 Thread godfrey
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new a521e82a473 [FLINK-27991][table-planner] ORC format supports reporting 
statistics
a521e82a473 is described below

commit a521e82a47384ad88e2424f9f6734f0c6d1f9b14
Author: zhengyunhong.zyh <337361...@qq.com>
AuthorDate: Wed Jul 13 14:59:20 2022 +0800

[FLINK-27991][table-planner] ORC format supports reporting statistics

This closes #20009
---
 flink-formats/flink-orc/pom.xml|   8 +
 .../org/apache/flink/orc/OrcFileFormatFactory.java | 202 +-
 .../orc/OrcFileSystemStatisticsReportTest.java |  51 +
 .../flink/orc/OrcFormatStatisticsReportTest.java   | 236 +
 .../planner/utils/StatisticsReportTestBase.java|   4 +-
 5 files changed, 497 insertions(+), 4 deletions(-)

diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 050ce79cab4..6aa5cceb4f7 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -109,6 +109,14 @@ under the License.
 

 
+   
+   
+   com.google.guava
+   guava
+   ${guava.version}
+   test
+   
+

org.apache.flink
flink-test-utils
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 1e0bf4249d5..02057fd66be 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.orc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
@@ -26,12 +27,14 @@ import 
org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import 
org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -40,17 +43,34 @@ import 
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ColumnStatisticsImpl;
 
+import java.io.IOException;
+import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
@@ -115,9 +135,12 @@ public class OrcFileFormatFactory implements 
BulkReaderFormatFactory, BulkWriter
 };
 }
 
-private static class OrcBulkDecodingFormat
+/** OrcBulkDecodingFormat which implements {@link 
FileBasedStatisticsReportableInputFormat}. */
+@VisibleForTesting
+public static class OrcBulkDecodingFormat
 implements BulkDecodingFormat,
-