[flink-table-store] branch master updated: [hotfix] Fix BinaryRowTypeSerializer and add test

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new f8cde15a [hotfix] Fix BinaryRowTypeSerializer and add test
f8cde15a is described below

commit f8cde15ac1b9c221b85fabef6e25429ee766fb25
Author: JingsongLi 
AuthorDate: Tue Feb 14 15:41:30 2023 +0800

[hotfix] Fix BinaryRowTypeSerializer and add test
---
 .../store/connector/BinaryRowTypeSerializer.java   |  6 ++-
 .../connector/BinaryRowTypeSerializerTest.java | 57 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java
index 3e63a67e..6a4545c7 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java
@@ -120,7 +120,11 @@ public final class BinaryRowTypeSerializer extends 
TypeSerializer {
 return new BinaryRowTypeSerializerSnapshot(serializer.getArity());
 }
 
-private static class BinaryRowTypeSerializerSnapshot
+/**
+ * {@link TypeSerializerSnapshot} for {@link BinaryRow}. It checks the 
compatibility of
+ * numFields without checking type.
+ */
+public static class BinaryRowTypeSerializerSnapshot
 implements TypeSerializerSnapshot {
 
 private int numFields;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializerTest.java
new file mode 100644
index ..8d0c1582
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.store.data.BinaryRow;
+
+import java.util.Random;
+
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+
+/** Test for {@link BinaryRowTypeSerializer}. */
+public class BinaryRowTypeSerializerTest extends SerializerTestBase 
{
+
+@Override
+protected TypeSerializer createSerializer() {
+return new BinaryRowTypeSerializer(2);
+}
+
+@Override
+protected int getLength() {
+return -1;
+}
+
+@Override
+protected Class getTypeClass() {
+return BinaryRow.class;
+}
+
+@Override
+protected BinaryRow[] getTestData() {
+Random rnd = new Random();
+return new BinaryRow[] {
+row(1, 1),
+row(2, 2),
+row(rnd.nextInt(), rnd.nextInt()),
+row(rnd.nextInt(), rnd.nextInt())
+};
+}
+}



[flink-table-store] branch master updated: [FLINK-31032] Supports AND push down for orc format

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 39e8b774 [FLINK-31032] Supports AND push down for orc format
39e8b774 is described below

commit 39e8b7744713c3219f83de73405a9f2c2d9c391c
Author: Shammon FY 
AuthorDate: Tue Feb 14 14:22:33 2023 +0800

[FLINK-31032] Supports AND push down for orc format

This closes #527
---
 .../table/store/format/orc/filter/OrcFilters.java  | 28 ++
 .../orc/filter/OrcPredicateFunctionVisitor.java| 11 -
 .../store/format/orc/OrcFilterConverterTest.java   | 19 +++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
index 236ce65d..ce18540b 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java
@@ -344,4 +344,32 @@ public class OrcFilters {
 return "OR(" + Arrays.toString(preds) + ")";
 }
 }
+
+/** An AND predicate that can be evaluated by the OrcInputFormat. */
+public static class And extends Predicate {
+private final Predicate[] preds;
+
+/**
+ * Creates an AND predicate.
+ *
+ * @param predicates The disjunctive predicates.
+ */
+public And(Predicate... predicates) {
+this.preds = predicates;
+}
+
+@Override
+public SearchArgument.Builder add(SearchArgument.Builder builder) {
+SearchArgument.Builder withAnd = builder.startAnd();
+for (Predicate pred : preds) {
+withAnd = pred.add(withAnd);
+}
+return withAnd.end();
+}
+
+@Override
+public String toString() {
+return "AND(" + Arrays.toString(preds) + ")";
+}
+}
 }
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
index ccbb3fee..e1edf0b4 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java
@@ -123,7 +123,16 @@ public class OrcPredicateFunctionVisitor
 
 @Override
 public Optional 
visitAnd(List> children) {
-return Optional.empty();
+if (children.size() != 2) {
+throw new RuntimeException("Illegal and children: " + 
children.size());
+}
+
+Optional c1 = children.get(0);
+if (!c1.isPresent()) {
+return Optional.empty();
+}
+Optional c2 = children.get(1);
+return c2.map(value -> new OrcFilters.And(c1.get(), value));
 }
 
 @Override
diff --git 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
index 702847a2..95af1295 100644
--- 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
+++ 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java
@@ -73,6 +73,25 @@ public class OrcFilterConverterTest {
 new OrcFilters.Equals("long1", 
PredicateLeaf.Type.LONG, 1),
 new OrcFilters.Equals("long1", 
PredicateLeaf.Type.LONG, 2)),
 new OrcFilters.Equals("long1", 
PredicateLeaf.Type.LONG, 3)));
+
+test(
+builder.between(0, 1L, 3L),
+new OrcFilters.And(
+new OrcFilters.Not(
+new OrcFilters.LessThan("long1", 
PredicateLeaf.Type.LONG, 1)),
+new OrcFilters.LessThanEquals("long1", 
PredicateLeaf.Type.LONG, 3)));
+
+test(
+builder.notIn(0, Arrays.asList(1L, 2L, 3L)),
+new OrcFilters.And(
+new OrcFilters.And(
+new OrcFilters.Not(
+new OrcFilters.Equals("long1", 
PredicateLeaf.Type.LONG, 1)),
+new OrcFilters.Not(
+new OrcFilters.Equals(
+   

[flink] branch release-1.17 updated (e1c6352d18a -> 58ca4d49c77)

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


from e1c6352d18a [FLINK-30917][runtime] Let adaptive batch scheduler also 
respect the user-configured max parallelism when deciding parallelism
 new c8f4c8f555c [hotfix] Fix incorrect url link in batch_shuffle.md.
 new 58ca4d49c77 [FLINK-30860][doc] Add document for hybrid shuffle with 
adaptive batch scheduler.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/deployment/elastic_scaling.md |  4 ++--
 docs/content.zh/docs/ops/batch/batch_shuffle.md| 18 --
 docs/content/docs/deployment/elastic_scaling.md|  4 ++--
 docs/content/docs/ops/batch/batch_shuffle.md   | 18 --
 4 files changed, 36 insertions(+), 8 deletions(-)



[flink] 02/02: [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58ca4d49c772bcdc73036a5efe0b6b0039ea75e0
Author: Weijie Guo 
AuthorDate: Wed Feb 8 15:25:40 2023 +0800

[FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch 
scheduler.

This closes #21890
---
 docs/content.zh/docs/deployment/elastic_scaling.md |  4 ++--
 docs/content.zh/docs/ops/batch/batch_shuffle.md| 16 +++-
 docs/content/docs/deployment/elastic_scaling.md|  4 ++--
 docs/content/docs/ops/batch/batch_shuffle.md   | 16 +++-
 4 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md 
b/docs/content.zh/docs/deployment/elastic_scaling.md
index 1e7409fb989..c65c974a35b 100644
--- a/docs/content.zh/docs/deployment/elastic_scaling.md
+++ b/docs/content.zh/docs/deployment/elastic_scaling.md
@@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
 - 根据数据量自动推导并行度可以更好地适应每天变化的数据量
 - SQL作业中的算子也可以分配不同的并行度
 
-当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 
`jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 
模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 
`ALL_EXCHANGES_BLOCKING`(默认值) 。
+当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 
`jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 
模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 
`ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 
`ALL_EXCHANGES_HYBRID_SELECTIVE`。
 
 ### 自动推导并发度
 
@@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
 
 ### 局限性
 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
-- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle 
mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 
ALL_EXCHANGES_BLOCKING 的作业。
+- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 
[shuffle mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 
ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。
 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 
`StreamExecutionEnvironment#readFile(...)` 
`StreamExecutionEnvironment#readTextFile(...)` 和 
`StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive 
Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< 
ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL 
Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
 - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 
自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 
[FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。
 
diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md 
b/docs/content.zh/docs/ops/batch/batch_shuffle.md
index 9ec955ce27c..fda29f75a01 100644
--- a/docs/content.zh/docs/ops/batch/batch_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md
@@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies:
 
 To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
+ Data Consumption Constraints
+
+Hybrid shuffle divides the partition data consumption constraints between 
producer and consumer into the following three cases:
+
+- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when 
all producers are finished.
+- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from 
finished producers.
+- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished 
producers.
+
+These could be configured via 
[jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref 
"docs/deployment/config" 
>}}#jobmanager-partition-hybrid-partition-data-consume-constraint).
+
+- **For `AdaptiveBatchScheduler`** : The default constraint is 
`UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set 
to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be 
degraded.
+- **If `SpeculativeExecution` is enabled** : The default constraint is 
`ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with 
blocking shuffle. Since producers and consumers have the opportunity to run at 
the same time, more speculative execution tasks may be created, and the cost of 
failover will also increase. If you want to fall back 

[flink] 01/02: [hotfix] Fix incorrect url link in batch_shuffle.md.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8f4c8f555cdca20cac49a394a3f7ed3eb1256f1
Author: Weijie Guo 
AuthorDate: Wed Feb 8 15:27:52 2023 +0800

[hotfix] Fix incorrect url link in batch_shuffle.md.
---
 docs/content.zh/docs/ops/batch/batch_shuffle.md | 2 +-
 docs/content/docs/ops/batch/batch_shuffle.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md 
b/docs/content.zh/docs/ops/batch/batch_shuffle.md
index 3db398a0ad3..9ec955ce27c 100644
--- a/docs/content.zh/docs/ops/batch/batch_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md
@@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies:
 
 ### Usage
 
-To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
+To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
 ### Limitations
 
diff --git a/docs/content/docs/ops/batch/batch_shuffle.md 
b/docs/content/docs/ops/batch/batch_shuffle.md
index 1ff370339c9..1760ecaf6a7 100644
--- a/docs/content/docs/ops/batch/batch_shuffle.md
+++ b/docs/content/docs/ops/batch/batch_shuffle.md
@@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies:
 
 ### Usage
 
-To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
+To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
 ### Limitations
 



[flink] branch release-1.17 updated (44e6cfb87c1 -> e1c6352d18a)

2023-02-13 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a change to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


from 44e6cfb87c1 [FLINK-30962][python] Improve the error message during 
launching py4j gateway server
 new c20ebde320b [hotfix] Adjust the parameters of 
AdaptiveBatchSchedulerITCase to make tests more reasonable
 new e1c6352d18a [FLINK-30917][runtime] Let adaptive batch scheduler also 
respect the user-configured max parallelism when deciding parallelism

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../adaptivebatch/AdaptiveBatchScheduler.java  |   5 +-
 ...faultVertexParallelismAndInputInfosDecider.java | 146 +++-
 .../VertexParallelismAndInputInfosDecider.java |   6 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |   6 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  66 -
 ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++---
 .../scheduling/AdaptiveBatchSchedulerITCase.java   |   7 +-
 7 files changed, 258 insertions(+), 125 deletions(-)



[flink] 01/02: [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable

2023-02-13 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c20ebde320b91ad87f7211ee0fd73416ef2934a6
Author: Lijie Wang 
AuthorDate: Fri Feb 10 21:42:38 2023 +0800

[hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make 
tests more reasonable
---
 .../apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
index 2dcc33585cd..cab7aa9ba74 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
@@ -84,11 +84,11 @@ class AdaptiveBatchSchedulerITCase {
 final StreamExecutionEnvironment env =
 
StreamExecutionEnvironment.createLocalEnvironment(configuration);
 env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-env.setParallelism(4);
+env.setParallelism(8);
 
 final DataStream source =
 env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
-.setParallelism(4)
+.setParallelism(8)
 .name("source")
 .slotSharingGroup("group1");
 
@@ -169,7 +169,8 @@ class AdaptiveBatchSchedulerITCase {
 configuration.setString(RestOptions.BIND_PORT, "0");
 configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
 configuration.setInteger(
-
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2);
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
+DEFAULT_MAX_PARALLELISM);
 configuration.set(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
 MemorySize.parse("150kb"));



[flink] 02/02: [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism

2023-02-13 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1c6352d18a19403e3eb80736c58b842de21bc88
Author: Lijie Wang 
AuthorDate: Mon Feb 6 16:46:35 2023 +0800

[FLINK-30917][runtime] Let adaptive batch scheduler also respect the 
user-configured max parallelism when deciding parallelism

This closes #21861
---
 .../adaptivebatch/AdaptiveBatchScheduler.java  |   5 +-
 ...faultVertexParallelismAndInputInfosDecider.java | 146 +++-
 .../VertexParallelismAndInputInfosDecider.java |   6 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |   6 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  66 -
 ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++---
 6 files changed, 254 insertions(+), 122 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 160f080c0fc..8abc880b2c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -320,7 +320,10 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
 
 final ParallelismAndInputInfos parallelismAndInputInfos =
 
vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(
-jobVertex.getJobVertexId(), inputs, parallelism);
+jobVertex.getJobVertexId(),
+inputs,
+parallelism,
+jobVertex.getMaxParallelism());
 
 if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
 log.info(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
index 00a8eb467c5..e7326562852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
@@ -75,54 +75,110 @@ public class DefaultVertexParallelismAndInputInfosDecider
  */
 private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768;
 
-private final int maxParallelism;
-private final int minParallelism;
+private final int globalMaxParallelism;
+private final int globalMinParallelism;
 private final long dataVolumePerTask;
-private final int defaultSourceParallelism;
+private final int globalDefaultSourceParallelism;
 
 private DefaultVertexParallelismAndInputInfosDecider(
-int maxParallelism,
-int minParallelism,
+int globalMaxParallelism,
+int globalMinParallelism,
 MemorySize dataVolumePerTask,
-int defaultSourceParallelism) {
+int globalDefaultSourceParallelism) {
 
-checkArgument(minParallelism > 0, "The minimum parallelism must be 
larger than 0.");
+checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
 checkArgument(
-maxParallelism >= minParallelism,
+globalMaxParallelism >= globalMinParallelism,
 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
 checkArgument(
-defaultSourceParallelism > 0,
+globalDefaultSourceParallelism > 0,
 "The default source parallelism must be larger than 0.");
 checkNotNull(dataVolumePerTask);
 
-this.maxParallelism = maxParallelism;
-this.minParallelism = minParallelism;
+this.globalMaxParallelism = globalMaxParallelism;
+this.globalMinParallelism = globalMinParallelism;
 this.dataVolumePerTask = dataVolumePerTask.getBytes();
-this.defaultSourceParallelism = defaultSourceParallelism;
+this.globalDefaultSourceParallelism = globalDefaultSourceParallelism;
 }
 
 @Override
 public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
 JobVertexID jobVertexId,
 List consumedResults,
-int initialParallelism) {
+int vertexInitialParallelism,
+int vertexMaxParallelism) {
 checkArgument(
-initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-|| initialParallelism > 0);
+

[flink] 01/02: [hotfix] Fix incorrect url link in batch_shuffle.md.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b89f060892c388ca5b985babb1383c11347472cc
Author: Weijie Guo 
AuthorDate: Wed Feb 8 15:27:52 2023 +0800

[hotfix] Fix incorrect url link in batch_shuffle.md.
---
 docs/content.zh/docs/ops/batch/batch_shuffle.md | 2 +-
 docs/content/docs/ops/batch/batch_shuffle.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md 
b/docs/content.zh/docs/ops/batch/batch_shuffle.md
index 3db398a0ad3..9ec955ce27c 100644
--- a/docs/content.zh/docs/ops/batch/batch_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md
@@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies:
 
 ### Usage
 
-To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
+To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
 ### Limitations
 
diff --git a/docs/content/docs/ops/batch/batch_shuffle.md 
b/docs/content/docs/ops/batch/batch_shuffle.md
index 1ff370339c9..1760ecaf6a7 100644
--- a/docs/content/docs/ops/batch/batch_shuffle.md
+++ b/docs/content/docs/ops/batch/batch_shuffle.md
@@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies:
 
 ### Usage
 
-To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
+To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
 ### Limitations
 



[flink] branch master updated (b3998324b68 -> 5ceda3da20e)

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from b3998324b68 [FLINK-30917][runtime] Let adaptive batch scheduler also 
respect the user-configured max parallelism when deciding parallelism
 new b89f060892c [hotfix] Fix incorrect url link in batch_shuffle.md.
 new 5ceda3da20e [FLINK-30860][doc] Add document for hybrid shuffle with 
adaptive batch scheduler.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/deployment/elastic_scaling.md |  4 ++--
 docs/content.zh/docs/ops/batch/batch_shuffle.md| 18 --
 docs/content/docs/deployment/elastic_scaling.md|  4 ++--
 docs/content/docs/ops/batch/batch_shuffle.md   | 18 --
 4 files changed, 36 insertions(+), 8 deletions(-)



[flink] 02/02: [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ceda3da20ec5ed50849d89a629744fe9d4010d8
Author: Weijie Guo 
AuthorDate: Wed Feb 8 15:25:40 2023 +0800

[FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch 
scheduler.

This closes #21890
---
 docs/content.zh/docs/deployment/elastic_scaling.md |  4 ++--
 docs/content.zh/docs/ops/batch/batch_shuffle.md| 16 +++-
 docs/content/docs/deployment/elastic_scaling.md|  4 ++--
 docs/content/docs/ops/batch/batch_shuffle.md   | 16 +++-
 4 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md 
b/docs/content.zh/docs/deployment/elastic_scaling.md
index 1e7409fb989..c65c974a35b 100644
--- a/docs/content.zh/docs/deployment/elastic_scaling.md
+++ b/docs/content.zh/docs/deployment/elastic_scaling.md
@@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
 - 根据数据量自动推导并行度可以更好地适应每天变化的数据量
 - SQL作业中的算子也可以分配不同的并行度
 
-当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 
`jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 
模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 
`ALL_EXCHANGES_BLOCKING`(默认值) 。
+当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 
`jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 
模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref 
"docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 
`ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 
`ALL_EXCHANGES_HYBRID_SELECTIVE`。
 
 ### 自动推导并发度
 
@@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
 
 ### 局限性
 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
-- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle 
mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 
ALL_EXCHANGES_BLOCKING 的作业。
+- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 
[shuffle mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 
ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。
 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 
`StreamExecutionEnvironment#readFile(...)` 
`StreamExecutionEnvironment#readTextFile(...)` 和 
`StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive 
Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< 
ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL 
Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
 - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 
自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 
[FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。
 
diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md 
b/docs/content.zh/docs/ops/batch/batch_shuffle.md
index 9ec955ce27c..fda29f75a01 100644
--- a/docs/content.zh/docs/ops/batch/batch_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md
@@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies:
 
 To use hybrid shuffle mode, you need to configure the 
[execution.batch-shuffle-mode]({{< ref "docs/deployment/config" 
>}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling 
strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
 
+ Data Consumption Constraints
+
+Hybrid shuffle divides the partition data consumption constraints between 
producer and consumer into the following three cases:
+
+- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when 
all producers are finished.
+- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from 
finished producers.
+- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished 
producers.
+
+These could be configured via 
[jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref 
"docs/deployment/config" 
>}}#jobmanager-partition-hybrid-partition-data-consume-constraint).
+
+- **For `AdaptiveBatchScheduler`** : The default constraint is 
`UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set 
to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be 
degraded.
+- **If `SpeculativeExecution` is enabled** : The default constraint is 
`ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with 
blocking shuffle. Since producers and consumers have the opportunity to run at 
the same time, more speculative execution tasks may be created, and the cost of 
failover will also increase. If you want to fall back to 

[flink] 02/02: [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism

2023-02-13 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3998324b685afc779954f7e54cc0d8f281267ec
Author: Lijie Wang 
AuthorDate: Mon Feb 6 16:46:35 2023 +0800

[FLINK-30917][runtime] Let adaptive batch scheduler also respect the 
user-configured max parallelism when deciding parallelism

This closes #21861
---
 .../adaptivebatch/AdaptiveBatchScheduler.java  |   5 +-
 ...faultVertexParallelismAndInputInfosDecider.java | 146 +++-
 .../VertexParallelismAndInputInfosDecider.java |   6 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |   6 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  66 -
 ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++---
 6 files changed, 254 insertions(+), 122 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 160f080c0fc..8abc880b2c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -320,7 +320,10 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
 
 final ParallelismAndInputInfos parallelismAndInputInfos =
 
vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(
-jobVertex.getJobVertexId(), inputs, parallelism);
+jobVertex.getJobVertexId(),
+inputs,
+parallelism,
+jobVertex.getMaxParallelism());
 
 if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
 log.info(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
index 00a8eb467c5..e7326562852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
@@ -75,54 +75,110 @@ public class DefaultVertexParallelismAndInputInfosDecider
  */
 private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768;
 
-private final int maxParallelism;
-private final int minParallelism;
+private final int globalMaxParallelism;
+private final int globalMinParallelism;
 private final long dataVolumePerTask;
-private final int defaultSourceParallelism;
+private final int globalDefaultSourceParallelism;
 
 private DefaultVertexParallelismAndInputInfosDecider(
-int maxParallelism,
-int minParallelism,
+int globalMaxParallelism,
+int globalMinParallelism,
 MemorySize dataVolumePerTask,
-int defaultSourceParallelism) {
+int globalDefaultSourceParallelism) {
 
-checkArgument(minParallelism > 0, "The minimum parallelism must be 
larger than 0.");
+checkArgument(globalMinParallelism > 0, "The minimum parallelism must 
be larger than 0.");
 checkArgument(
-maxParallelism >= minParallelism,
+globalMaxParallelism >= globalMinParallelism,
 "Maximum parallelism should be greater than or equal to the 
minimum parallelism.");
 checkArgument(
-defaultSourceParallelism > 0,
+globalDefaultSourceParallelism > 0,
 "The default source parallelism must be larger than 0.");
 checkNotNull(dataVolumePerTask);
 
-this.maxParallelism = maxParallelism;
-this.minParallelism = minParallelism;
+this.globalMaxParallelism = globalMaxParallelism;
+this.globalMinParallelism = globalMinParallelism;
 this.dataVolumePerTask = dataVolumePerTask.getBytes();
-this.defaultSourceParallelism = defaultSourceParallelism;
+this.globalDefaultSourceParallelism = globalDefaultSourceParallelism;
 }
 
 @Override
 public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex(
 JobVertexID jobVertexId,
 List consumedResults,
-int initialParallelism) {
+int vertexInitialParallelism,
+int vertexMaxParallelism) {
 checkArgument(
-initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
-|| initialParallelism > 0);
+

[flink] branch master updated (e67b141f2b2 -> b3998324b68)

2023-02-13 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from e67b141f2b2 [FLINK-30961][python] Remove deprecated "tests_require" in 
setup.py (#21893)
 new 2fc90245f57 [hotfix] Adjust the parameters of 
AdaptiveBatchSchedulerITCase to make tests more reasonable
 new b3998324b68 [FLINK-30917][runtime] Let adaptive batch scheduler also 
respect the user-configured max parallelism when deciding parallelism

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../adaptivebatch/AdaptiveBatchScheduler.java  |   5 +-
 ...faultVertexParallelismAndInputInfosDecider.java | 146 +++-
 .../VertexParallelismAndInputInfosDecider.java |   6 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |   6 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  66 -
 ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++---
 .../scheduling/AdaptiveBatchSchedulerITCase.java   |   7 +-
 7 files changed, 258 insertions(+), 125 deletions(-)



[flink] 01/02: [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable

2023-02-13 Thread wanglijie
This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2fc90245f570931f75924f4704f5d681ac88ffe4
Author: Lijie Wang 
AuthorDate: Fri Feb 10 21:42:38 2023 +0800

[hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make 
tests more reasonable
---
 .../apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
index 2dcc33585cd..cab7aa9ba74 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
@@ -84,11 +84,11 @@ class AdaptiveBatchSchedulerITCase {
 final StreamExecutionEnvironment env =
 
StreamExecutionEnvironment.createLocalEnvironment(configuration);
 env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-env.setParallelism(4);
+env.setParallelism(8);
 
 final DataStream source =
 env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
-.setParallelism(4)
+.setParallelism(8)
 .name("source")
 .slotSharingGroup("group1");
 
@@ -169,7 +169,8 @@ class AdaptiveBatchSchedulerITCase {
 configuration.setString(RestOptions.BIND_PORT, "0");
 configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
 configuration.setInteger(
-
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2);
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
+DEFAULT_MAX_PARALLELISM);
 configuration.set(
 
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
 MemorySize.parse("150kb"));



[flink-table-store] branch master updated: [FLINK-31027] Introduce annotations for table store

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 706a92ff [FLINK-31027] Introduce annotations for table store
706a92ff is described below

commit 706a92ffeea7cdf8467d7839aee86132eae50eb5
Author: Shammon FY 
AuthorDate: Tue Feb 14 11:02:56 2023 +0800

[FLINK-31027] Introduce annotations for table store

This closes #526
---
 .../utils/BenchmarkGlobalConfiguration.java|  2 --
 .../table/store/annotation/Documentation.java  |  4 ---
 .../table/store/annotation/VisibleForTesting.java  | 31 +-
 .../flink/table/store/data/BinaryArrayWriter.java  |  2 --
 .../flink/table/store/data/BinaryRowWriter.java|  2 --
 .../store/data/columnar/ByteColumnVector.java  |  3 ---
 .../store/data/columnar/BytesColumnVector.java |  3 ---
 .../table/store/data/columnar/ColumnVector.java|  3 ---
 .../store/data/columnar/DoubleColumnVector.java|  3 ---
 .../store/data/columnar/FloatColumnVector.java |  3 ---
 .../table/store/data/columnar/IntColumnVector.java |  3 ---
 .../store/data/columnar/LongColumnVector.java  |  3 ---
 .../table/store/data/columnar/RowColumnVector.java |  3 ---
 .../store/data/columnar/ShortColumnVector.java |  3 ---
 .../data/columnar/heap/AbstractHeapVector.java |  2 --
 .../data/columnar/heap/HeapBooleanVector.java  |  2 --
 .../store/data/columnar/heap/HeapByteVector.java   |  2 --
 .../store/data/columnar/heap/HeapBytesVector.java  |  2 --
 .../store/data/columnar/heap/HeapDoubleVector.java |  2 --
 .../store/data/columnar/heap/HeapFloatVector.java  |  2 --
 .../store/data/columnar/heap/HeapIntVector.java|  2 --
 .../store/data/columnar/heap/HeapLongVector.java   |  2 --
 .../store/data/columnar/heap/HeapRowVector.java|  2 --
 .../store/data/columnar/heap/HeapShortVector.java  |  2 --
 .../columnar/writable/AbstractWritableVector.java  |  2 --
 .../columnar/writable/WritableBooleanVector.java   |  2 --
 .../data/columnar/writable/WritableByteVector.java |  2 --
 .../columnar/writable/WritableBytesVector.java |  2 --
 .../columnar/writable/WritableColumnVector.java|  2 --
 .../columnar/writable/WritableDoubleVector.java|  2 --
 .../columnar/writable/WritableFloatVector.java |  2 --
 .../data/columnar/writable/WritableIntVector.java  |  2 --
 .../data/columnar/writable/WritableLongVector.java |  2 --
 .../columnar/writable/WritableShortVector.java |  2 --
 .../table/store/file/utils/RecyclableIterator.java |  3 ---
 .../flink/table/store/fs/hadoop/HadoopFileIO.java  |  2 +-
 .../apache/flink/table/store/io/DataInputView.java |  2 --
 .../flink/table/store/io/DataInputViewStream.java  |  3 ---
 .../table/store/io/DataInputViewStreamWrapper.java |  3 ---
 .../flink/table/store/io/DataOutputViewStream.java |  3 ---
 .../store/io/DataOutputViewStreamWrapper.java  |  3 ---
 .../table/store/memory/MemorySegmentUtils.java |  2 +-
 .../store/options/description/Description.java |  3 ---
 .../table/store/plugin/ComponentClassLoader.java   |  2 +-
 .../apache/flink/table/store/types/DateType.java   |  3 ---
 .../flink/table/store/utils/OperatingSystem.java   |  3 ---
 .../flink/table/store/utils/Preconditions.java |  3 ---
 .../flink/table/store/utils/ThrowingConsumer.java  |  3 ---
 .../flink/table/store/datagen/DataGenerator.java   |  3 ---
 .../store/datagen/DataGeneratorContainer.java  |  2 --
 .../table/store/datagen/DataGeneratorMapper.java   |  2 --
 .../flink/table/store/datagen/RandomGenerator.java |  3 ---
 .../store/datagen/RandomGeneratorVisitor.java  |  2 --
 .../flink/table/store/types/DataTypeAssert.java|  2 --
 .../store/connector/AbstractTableStoreFactory.java |  2 +-
 .../flink/table/store/connector/FlinkCatalog.java  |  2 +-
 .../table/store/kafka/KafkaLogSinkProvider.java|  2 +-
 .../table/store/kafka/KafkaLogSourceProvider.java  |  2 +-
 .../flink/table/store/file/AbstractFileStore.java  |  2 +-
 .../apache/flink/table/store/file/KeyValue.java|  2 +-
 .../file/append/AppendOnlyCompactManager.java  |  2 +-
 .../table/store/file/catalog/CatalogLock.java  |  3 ---
 .../table/store/file/io/DataFilePathFactory.java   |  2 +-
 .../store/file/io/KeyValueFileWriterFactory.java   |  2 +-
 .../table/store/file/io/RollingFileWriter.java |  2 +-
 .../table/store/file/manifest/ManifestFile.java|  2 +-
 .../store/file/mergetree/MergeTreeWriter.java  |  2 +-
 .../file/mergetree/SortBufferWriteBuffer.java  |  2 +-
 .../table/store/file/mergetree/SortedRun.java  |  2 +-
 .../mergetree/compact/MergeTreeCompactManager.java |  2 +-
 .../mergetree/compact/UniversalCompaction.java |  2 +-
 .../file/operation/AbstractFileStoreWrite.java |  2 +-
 .../store/file/operation/FileStoreExpireImpl.java  |  2 +-
 

[flink-connector-pulsar] branch main updated: [FLINK-31048][Connector/Pulsar] Fix incorrect java doc in PulsarSource and PulsarSourceBuilder.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
 new fb98096  [FLINK-31048][Connector/Pulsar] Fix incorrect java doc in 
PulsarSource and PulsarSourceBuilder.
fb98096 is described below

commit fb98096d9a67d26b1965b8ef181125317a59a5c4
Author: Weijie Guo 
AuthorDate: Tue Feb 14 01:18:06 2023 +0800

[FLINK-31048][Connector/Pulsar] Fix incorrect java doc in PulsarSource and 
PulsarSourceBuilder.

This closes #28
---
 .../java/org/apache/flink/connector/pulsar/source/PulsarSource.java   | 2 +-
 .../org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index c7d272b..c038f32 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -57,7 +57,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
  * .setServiceUrl(getServiceUrl())
  * .setAdminUrl(getAdminUrl())
  * .setSubscriptionName("test")
- * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new 
SimpleStringSchema()))
+ * .setDeserializationSchema(new SimpleStringSchema())
  * .setBounded(StopCursor::defaultStopCursor)
  * .build();
  * }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 80d8c30..1c8cfbe 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -89,7 +89,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * .setAdminUrl(PULSAR_BROKER_HTTP_URL)
  * .setSubscriptionName("flink-source-1")
  * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
- * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new 
SimpleStringSchema()))
+ * .setDeserializationSchema(new SimpleStringSchema())
  * .build();
  * }
  *
@@ -118,7 +118,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * .setAdminUrl(PULSAR_BROKER_HTTP_URL)
  * .setSubscriptionName("flink-source-1")
  * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
- * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new 
SimpleStringSchema()))
+ * .setDeserializationSchema(new SimpleStringSchema())
  * 
.setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis()))
  * .build();
  * }



[flink] branch master updated: [FLINK-30961][python] Remove deprecated "tests_require" in setup.py (#21893)

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e67b141f2b2 [FLINK-30961][python] Remove deprecated "tests_require" in 
setup.py (#21893)
e67b141f2b2 is described below

commit e67b141f2b2414cf9b6ddc43e817d4250dd38319
Author: Juntao Hu 
AuthorDate: Mon Feb 13 18:51:20 2023 -0800

[FLINK-30961][python] Remove deprecated "tests_require" in setup.py (#21893)
---
 flink-python/dev/dev-requirements.txt | 1 +
 flink-python/setup.py | 1 -
 flink-python/tox.ini  | 1 -
 3 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-python/dev/dev-requirements.txt 
b/flink-python/dev/dev-requirements.txt
index ff19b6f1f15..f36142c0fb2 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -31,3 +31,4 @@ grpcio-tools>=1.29.0,<=1.46.3
 pemja==0.3.0; platform_system != 'Windows'
 httplib2>=0.19.0,<=0.20.4
 protobuf>=3.19.0,<=3.21
+pytest~=7.0
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 9e0affe6c68..893127d232e 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -330,7 +330,6 @@ try:
 python_requires='>=3.7',
 install_requires=install_requires,
 cmdclass={'build_ext': build_ext},
-tests_require=['pytest==4.4.1'],
 description='Apache Flink Python API',
 long_description=long_description,
 long_description_content_type='text/markdown',
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 78f82a65cf6..ac8573a69dc 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -28,7 +28,6 @@ whitelist_externals=
 /bin/bash
 deps =
 -r dev/dev-requirements.txt
-pytest
 apache-flink-libraries
 passenv = *
 commands =



[flink] branch release-1.15 updated: [FLINK-30962][python] Improve the error message during launching py4j gateway server

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new b6cbe5403d8 [FLINK-30962][python] Improve the error message during 
launching py4j gateway server
b6cbe5403d8 is described below

commit b6cbe5403d86d42b26b2105d5be9d016384bf736
Author: Juntao Hu 
AuthorDate: Wed Feb 8 18:05:30 2023 +0800

[FLINK-30962][python] Improve the error message during launching py4j 
gateway server

This closes #21894.
---
 flink-python/pyflink/java_gateway.py   | 6 +-
 flink-python/pyflink/pyflink_gateway_server.py | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/flink-python/pyflink/java_gateway.py 
b/flink-python/pyflink/java_gateway.py
index fdb47a8c754..963b7454439 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -109,7 +109,11 @@ def launch_gateway():
 time.sleep(0.1)
 
 if not os.path.isfile(conn_info_file):
-raise Exception("Java gateway process exited before sending its 
port number")
+stderr_info = p.stderr.read().decode('utf-8')
+raise RuntimeError(
+"Java gateway process exited before sending its port 
number.\nStderr:\n"
++ stderr_info
+)
 
 with open(conn_info_file, "rb") as info:
 gateway_port = struct.unpack("!I", info.read(4))[0]
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
index 60301da3843..b685c0922c2 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -323,7 +323,7 @@ def launch_gateway_server_process(env, args):
 signal.signal(signal.SIGINT, signal.SIG_IGN)
 preexec_fn = preexec_func
 return Popen(list(filter(lambda c: len(c) != 0, command)),
- stdin=PIPE, preexec_fn=preexec_fn, env=env)
+ stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
 
 
 if __name__ == "__main__":



[flink] branch release-1.16 updated (c0d52c09340 -> c57699cb471)

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from c0d52c09340 [FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose
 add c57699cb471 [FLINK-30962][python] Improve the error message during 
launching py4j gateway server

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/java_gateway.py   | 6 +-
 flink-python/pyflink/pyflink_gateway_server.py | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)



[flink] branch release-1.17 updated: [FLINK-30962][python] Improve the error message during launching py4j gateway server

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 44e6cfb87c1 [FLINK-30962][python] Improve the error message during 
launching py4j gateway server
44e6cfb87c1 is described below

commit 44e6cfb87c1b2a5f4106df61cd06c4870d4802f8
Author: Juntao Hu 
AuthorDate: Wed Feb 8 18:05:30 2023 +0800

[FLINK-30962][python] Improve the error message during launching py4j 
gateway server

This closes #21894.
---
 flink-python/pyflink/java_gateway.py   | 6 +-
 flink-python/pyflink/pyflink_gateway_server.py | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/flink-python/pyflink/java_gateway.py 
b/flink-python/pyflink/java_gateway.py
index bdcb57e8094..4bd1c3c858e 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -109,7 +109,11 @@ def launch_gateway():
 time.sleep(0.1)
 
 if not os.path.isfile(conn_info_file):
-raise Exception("Java gateway process exited before sending its 
port number")
+stderr_info = p.stderr.read().decode('utf-8')
+raise RuntimeError(
+"Java gateway process exited before sending its port 
number.\nStderr:\n"
++ stderr_info
+)
 
 with open(conn_info_file, "rb") as info:
 gateway_port = struct.unpack("!I", info.read(4))[0]
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
index f7947d7897c..fdb791120ef 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -261,7 +261,7 @@ def launch_gateway_server_process(env, args):
 signal.signal(signal.SIGINT, signal.SIG_IGN)
 preexec_fn = preexec_func
 return Popen(list(filter(lambda c: len(c) != 0, command)),
- stdin=PIPE, preexec_fn=preexec_fn, env=env)
+ stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
 
 
 if __name__ == "__main__":



[flink] branch master updated (3a647d6bec7 -> 50c522f1d81)

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


from 3a647d6bec7 [FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose
 add 50c522f1d81 [FLINK-30962][python] Improve the error message during 
launching py4j gateway server

No new revisions were added by this update.

Summary of changes:
 flink-python/pyflink/java_gateway.py   | 6 +-
 flink-python/pyflink/pyflink_gateway_server.py | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)



[flink] branch release-1.16 updated (0994832be8a -> c0d52c09340)

2023-02-13 Thread czweng
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from 0994832be8a [FLINK-30640][sql-client] Fix unstable ctas test in 
CliClientITCase (#21896)
 add c0d52c09340 [FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/table/api/TableITCase.scala   | 37 --
 1 file changed, 21 insertions(+), 16 deletions(-)



[flink] branch release-1.17 updated: [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose

2023-02-13 Thread czweng
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new bb66b989188 [FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose
bb66b989188 is described below

commit bb66b989188c661d9d7847b6a5728697a3a2dcd2
Author: Matthias Pohl 
AuthorDate: Tue Feb 14 03:26:19 2023 +0100

[FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose

This closes #21921.
---
 .../org/apache/flink/table/api/TableITCase.scala   | 37 --
 1 file changed, 21 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
index e108c98e265..e09c6274aab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.catalog.{Column, ResolvedSchema}
 import org.apache.flink.table.planner.utils.TestTableSourceSinks
 import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.test.util.AbstractTestBase.MINI_CLUSTER_RESOURCE
 import org.apache.flink.types.{Row, RowKind}
 import org.apache.flink.util.CollectionUtil
 
@@ -32,7 +33,7 @@ import _root_.java.util
 import org.hamcrest.MatcherAssert.assertThat
 import org.hamcrest.Matchers.containsInAnyOrder
 import org.junit.{Before, Rule, Test}
-import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -108,28 +109,32 @@ class TableITCase(tableEnvName: String, isStreaming: 
Boolean) extends AbstractTe
 
   @Test
   def testCollectWithClose(): Unit = {
-val query =
+val sourceDdl =
   """
-|select id, concat(concat(`first`, ' '), `last`) as `full name`
-|from MyTable where mod(id, 2) = 0
-  """.stripMargin
+|create table unbounded_source (
+|  id int
+|) with (
+|  'connector' = 'datagen',
+|  'number-of-rows' = '1',
+|  'rows-per-second' = '1' -- slow producing speed to make sure that
+|  -- source is not finished when job is 
cancelled
+|)
+|""".stripMargin
+tEnv.executeSql(sourceDdl)
+val query = "select id from unbounded_source where mod(id, 2) = 0"
 val table = tEnv.sqlQuery(query)
 val tableResult = table.execute()
 assertTrue(tableResult.getJobClient.isPresent)
 assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
 val it = tableResult.collect()
 it.close()
-val jobStatus =
-  try {
-Some(tableResult.getJobClient.get().getJobStatus.get())
-  } catch {
-// ignore the exception,
-// because the MiniCluster maybe already been shut down when getting 
job status
-case _: Throwable => None
-  }
-if (jobStatus.isDefined) {
-  assertNotEquals(JobStatus.RUNNING, jobStatus.get)
-}
+
+// wait for mini cluster to shut down
+val jobClient = tableResult.getJobClient.get()
+val jobId = jobClient.getJobID
+MINI_CLUSTER_RESOURCE.getClusterClient.requestJobResult(jobId).get()
+
+assertEquals(JobStatus.CANCELED, jobClient.getJobStatus.get())
   }
 
   @Test



[flink] branch master updated: [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose

2023-02-13 Thread czweng
This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 3a647d6bec7 [FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose
3a647d6bec7 is described below

commit 3a647d6bec7d0413bcbc71668ed45be57bb4bfe6
Author: tsreaper 
AuthorDate: Tue Feb 14 10:25:47 2023 +0800

[FLINK-25813][table-planner] Fix unstable test 
TableITCase#testCollectWithClose

This closes #18893.
---
 .../org/apache/flink/table/api/TableITCase.scala   | 37 --
 1 file changed, 21 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
index e108c98e265..e09c6274aab 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.catalog.{Column, ResolvedSchema}
 import org.apache.flink.table.planner.utils.TestTableSourceSinks
 import org.apache.flink.test.util.AbstractTestBase
+import org.apache.flink.test.util.AbstractTestBase.MINI_CLUSTER_RESOURCE
 import org.apache.flink.types.{Row, RowKind}
 import org.apache.flink.util.CollectionUtil
 
@@ -32,7 +33,7 @@ import _root_.java.util
 import org.hamcrest.MatcherAssert.assertThat
 import org.hamcrest.Matchers.containsInAnyOrder
 import org.junit.{Before, Rule, Test}
-import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -108,28 +109,32 @@ class TableITCase(tableEnvName: String, isStreaming: 
Boolean) extends AbstractTe
 
   @Test
   def testCollectWithClose(): Unit = {
-val query =
+val sourceDdl =
   """
-|select id, concat(concat(`first`, ' '), `last`) as `full name`
-|from MyTable where mod(id, 2) = 0
-  """.stripMargin
+|create table unbounded_source (
+|  id int
+|) with (
+|  'connector' = 'datagen',
+|  'number-of-rows' = '1',
+|  'rows-per-second' = '1' -- slow producing speed to make sure that
+|  -- source is not finished when job is 
cancelled
+|)
+|""".stripMargin
+tEnv.executeSql(sourceDdl)
+val query = "select id from unbounded_source where mod(id, 2) = 0"
 val table = tEnv.sqlQuery(query)
 val tableResult = table.execute()
 assertTrue(tableResult.getJobClient.isPresent)
 assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
 val it = tableResult.collect()
 it.close()
-val jobStatus =
-  try {
-Some(tableResult.getJobClient.get().getJobStatus.get())
-  } catch {
-// ignore the exception,
-// because the MiniCluster maybe already been shut down when getting 
job status
-case _: Throwable => None
-  }
-if (jobStatus.isDefined) {
-  assertNotEquals(JobStatus.RUNNING, jobStatus.get)
-}
+
+// wait for mini cluster to shut down
+val jobClient = tableResult.getJobClient.get()
+val jobId = jobClient.getJobID
+MINI_CLUSTER_RESOURCE.getClusterClient.requestJobResult(jobId).get()
+
+assertEquals(JobStatus.CANCELED, jobClient.getJobStatus.get())
   }
 
   @Test



[flink-table-store] branch master updated: [hotfix] Fix options in FlinkSinkBuilder

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new aba5e166 [hotfix] Fix options in FlinkSinkBuilder
aba5e166 is described below

commit aba5e166b69d1a8437222988f777fa004268980f
Author: Shammon FY 
AuthorDate: Tue Feb 14 10:20:48 2023 +0800

[hotfix] Fix options in FlinkSinkBuilder

This closes #530
---
 .../apache/flink/table/store/connector/sink/FlinkSinkBuilder.java   | 2 +-
 .../table/store/connector/sink/FileStoreShuffleBucketTest.java  | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 7d0e570c..7358201d 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -89,7 +89,7 @@ public class FlinkSinkBuilder {
 table.schema(),
 table.options()
 .toConfiguration()
-
.getBoolean(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION));
+
.get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION));
 PartitionTransformation partitioned =
 new PartitionTransformation<>(input.getTransformation(), 
partitioner);
 if (parallelism != null) {
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java
index c7588063..eeff959b 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java
@@ -36,8 +36,8 @@ import 
org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
 private static final int TOTAL_SOURCE_RECORD_COUNT = 1000;
 
-@Before
+@BeforeEach
 public void after() throws Exception {
 super.before();
 CollectStoreSinkWrite.writeRowsMap.clear();



[flink] branch release-1.16 updated (700f8839126 -> 0994832be8a)

2023-02-13 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from 700f8839126 [FLINK-28440][state/changelog] record reference count of 
StreamStateHandle in TaskChangelogRegistry
 add 0994832be8a [FLINK-30640][sql-client] Fix unstable ctas test in 
CliClientITCase (#21896)

No new revisions were added by this update.

Summary of changes:
 flink-table/flink-sql-client/src/test/resources/sql/set.q | 11 +--
 1 file changed, 9 insertions(+), 2 deletions(-)



[flink-table-store] branch master updated: [FLINK-30997] Refactor tests in connector to extends AbstractTestBase

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 74cc7ce1 [FLINK-30997] Refactor tests in connector to extends 
AbstractTestBase
74cc7ce1 is described below

commit 74cc7ce1e516e96349021fa7a4c96f2e992d1b28
Author: Shammon FY 
AuthorDate: Tue Feb 14 09:30:01 2023 +0800

[FLINK-30997] Refactor tests in connector to extends AbstractTestBase

This closes #512
---
 .../store/connector/AppendOnlyTableITCase.java |   2 +-
 .../store/connector/BatchFileStoreITCase.java  |   2 +-
 .../table/store/connector/CatalogITCaseBase.java   |  17 +-
 .../table/store/connector/CatalogTableITCase.java  |   7 +-
 ...AndMultiPartitionedTableWIthKafkaLogITCase.java |   6 +-
 .../ComputedColumnAndWatermarkTableITCase.java |   6 +-
 .../store/connector/ContinuousFileStoreITCase.java |  35 ++--
 .../table/store/connector/FileStoreITCase.java |  50 ++---
 .../store/connector/FileSystemCatalogITCase.java   |  16 +-
 .../store/connector/ForceCompactionITCase.java |   2 +-
 .../connector/FullCompactionFileStoreITCase.java   |   6 +-
 .../table/store/connector/LargeDataITCase.java |   2 +-
 .../table/store/connector/LogSystemITCase.java |   8 +-
 .../table/store/connector/LookupJoinITCase.java|  10 +-
 .../table/store/connector/MappingTableITCase.java  |  10 +-
 .../table/store/connector/PartialUpdateITCase.java |   2 +-
 .../store/connector/PreAggregationITCase.java  |   2 +-
 .../table/store/connector/PredicateITCase.java |   2 +-
 .../table/store/connector/RescaleBucketITCase.java |  28 ++-
 .../table/store/connector/SchemaChangeITCase.java  |   2 +-
 .../StreamingReadWriteTableWithKafkaLogITCase.java |  10 +-
 .../store/connector/StreamingWarehouseITCase.java  |   2 +-
 .../store/connector/sink/CompactorSinkITCase.java  |  27 +--
 .../store/connector/sink/SinkSavepointITCase.java  |  53 +++--
 .../connector/source/CompactorSourceITCase.java|  10 +-
 .../store/connector/util/AbstractTestBase.java |  22 +-
 .../util/MiniClusterWithClientExtension.java   | 225 +
 .../table/store/kafka/KafkaTableTestBase.java  |  71 ---
 28 files changed, 449 insertions(+), 186 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 757934eb..7bc17944 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
index 7fdd2278..b6ab0591 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.types.Row;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.List;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index cc916ddb..6b03d548 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -30,22 +30,22 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.connector.util.AbstractTestBase;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.fs.Path;
 import org.apache.flink.table.store.fs.local.LocalFileIO;
-import 

[flink-table-store] branch master updated: [FLINK-30979] Support shuffling data by partition

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 219c4de5 [FLINK-30979] Support shuffling data by partition
219c4de5 is described below

commit 219c4de518c419eade41e99fc419d7f2ccfed213
Author: Shammon FY 
AuthorDate: Tue Feb 14 09:28:47 2023 +0800

[FLINK-30979] Support shuffling data by partition

This closes #522
---
 docs/content/docs/how-to/writing-tables.md |   3 +
 .../generated/flink_connector_configuration.html   |   6 +
 .../store/connector/FlinkConnectorOptions.java |   7 +
 .../connector/sink/BucketStreamPartitioner.java|  32 +++-
 .../table/store/connector/sink/FlinkSink.java  |   9 +-
 .../store/connector/sink/FlinkSinkBuilder.java |  23 ++-
 .../connector/sink/FileStoreShuffleBucketTest.java | 163 +
 .../table/store/table/sink/PartitionComputer.java  |  43 ++
 8 files changed, 273 insertions(+), 13 deletions(-)

diff --git a/docs/content/docs/how-to/writing-tables.md 
b/docs/content/docs/how-to/writing-tables.md
index dcdfae3c..14c19c42 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -96,6 +96,9 @@ Use `INSERT INTO` to apply records and changes to tables.
 INSERT INTO MyTable SELECT ...
 ```
 
+Table Store supports shuffle data by bucket in sink phase. To improve data 
skew, Table Store also
+supports shuffling data by partition fields. You can add option 
`sink.partition-shuffle` to the table.
+
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d5d40b36..6431a767 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,5 +26,11 @@
 Integer
 Defines a custom parallelism for the sink. By default, if this 
option is not defined, the planner will derive the parallelism for each 
statement individually by also considering the global configuration.
 
+
+sink.partition-shuffle
+false
+Boolean
+The option to enable shuffle data by dynamic partition fields 
in sink phase for table store.
+
 
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 127269e2..504cdb43 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -62,6 +62,13 @@ public class FlinkConnectorOptions {
 + "By default, if this option is not 
defined, the planner will derive the parallelism "
 + "for each statement individually by also 
considering the global configuration.");
 
+public static final ConfigOption SINK_SHUFFLE_BY_PARTITION =
+ConfigOptions.key("sink.partition-shuffle")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"The option to enable shuffle data by dynamic 
partition fields in sink phase for table store.");
+
 public static final ConfigOption SCAN_PARALLELISM =
 ConfigOptions.key("scan.parallelism")
 .intType()
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index 7b3d6d29..d8bd0c20 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -24,32 +24,48 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.connector.FlinkRowWrapper;
+import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.sink.BucketComputer;
+import org.apache.flink.table.store.table.sink.PartitionComputer;
+
+import java.util.Objects;
+import java.util.function.Function;
 
 /** A {@link StreamPartitioner} to partition 

[flink-connector-pulsar] branch main updated: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception. (#24)

2023-02-13 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
 new 2278653  [FLINK-30109][Connector/Pulsar] Drop the use of sneaky 
exception. (#24)
2278653 is described below

commit 2278653d67a8ddf171c88d538a288e503221625a
Author: Yufan Sheng 
AuthorDate: Mon Feb 13 21:46:26 2023 +0800

[FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception. (#24)
---
 .../pulsar/common/config/PulsarClientFactory.java  |  21 ++--
 .../pulsar/common/utils/PulsarExceptionUtils.java  |  83 
 .../common/utils/PulsarTransactionUtils.java   |  26 ++--
 .../flink/connector/pulsar/sink/PulsarSink.java|   5 +-
 .../pulsar/sink/committer/PulsarCommitter.java |   7 +-
 .../pulsar/sink/config/SinkConfiguration.java  |   2 +-
 .../connector/pulsar/sink/writer/PulsarWriter.java |   6 +-
 .../pulsar/sink/writer/topic/MetadataListener.java |   4 +-
 .../pulsar/sink/writer/topic/ProducerRegister.java |  31 +++--
 .../connector/pulsar/source/PulsarSource.java  |   7 +-
 .../pulsar/source/PulsarSourceBuilder.java |   2 +-
 .../source/enumerator/PulsarSourceEnumerator.java  |  53 
 .../source/enumerator/cursor/StopCursor.java   |   2 +-
 .../cursor/stop/LatestMessageStopCursor.java   |   7 +-
 .../enumerator/subscriber/PulsarSubscriber.java|  15 ++-
 .../subscriber/impl/BasePulsarSubscriber.java  |  50 +---
 .../subscriber/impl/TopicListSubscriber.java   |  32 ++---
 .../subscriber/impl/TopicPatternSubscriber.java|  74 ++-
 .../source/reader/PulsarPartitionSplitReader.java  |  29 +++--
 .../source/reader/PulsarSourceFetcherManager.java  |  22 ++--
 .../pulsar/source/split/PulsarPartitionSplit.java  |   7 +-
 .../pulsar/common/schema/PulsarSchemaTest.java |   1 +
 .../sink/writer/topic/MetadataListenerTest.java|   6 +-
 .../sink/writer/topic/ProducerRegisterTest.java|   6 +-
 .../enumerator/PulsarSourceEnumeratorTest.java |  25 ++--
 .../source/enumerator/cursor/StopCursorTest.java   |   4 +-
 .../subscriber/PulsarSubscriberTest.java   |  41 +++---
 .../reader/PulsarPartitionSplitReaderTest.java |  74 +--
 .../source/reader/PulsarSourceReaderTest.java  |   2 +-
 .../PulsarDeserializationSchemaTest.java   |  16 +--
 .../pulsar/testutils/PulsarTestCommonUtils.java|   6 -
 .../pulsar/testutils/PulsarTestEnvironment.java|   8 +-
 .../pulsar/testutils/function/ControlSource.java   |  14 +--
 .../pulsar/testutils/runtime/PulsarRuntime.java|   4 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   | 140 -
 .../runtime/container/PulsarContainerRuntime.java  |  16 +--
 .../testutils/sink/PulsarSinkTestContext.java  |   7 +-
 .../sink/reader/PulsarPartitionDataReader.java |  10 +-
 .../cases/MultipleTopicsConsumingContext.java  |  12 +-
 .../source/cases/SingleTopicConsumingContext.java  |  17 ++-
 .../writer/KeyedPulsarPartitionDataWriter.java |  15 ++-
 .../source/writer/PulsarEncryptDataWriter.java |  19 ++-
 .../source/writer/PulsarPartitionDataWriter.java   |   7 +-
 43 files changed, 453 insertions(+), 482 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
index 1f01b24..4939a23 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 import java.util.Map;
@@ -76,7 +77,6 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY;
-import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.pulsar.client.api.SizeUnit.BYTES;
 
 /** The factory for creating pulsar client classes from {@link 
PulsarConfiguration}. */
@@ -88,7 +88,8 @@ public final class PulsarClientFactory {
 }
 
 /** Create a PulsarClient by using the flink 

[flink-connector-pulsar] branch main updated: [FLINK-30489][Connector/Pulsar] Shade all the dependencies in flink-sql-connector-pulsar. (#26)

2023-02-13 Thread tison
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
 new 5f0bb2d  [FLINK-30489][Connector/Pulsar] Shade all the dependencies in 
flink-sql-connector-pulsar. (#26)
5f0bb2d is described below

commit 5f0bb2db9f1f357a11cce965eb5832bb908523a1
Author: Yufan Sheng 
AuthorDate: Mon Feb 13 21:41:52 2023 +0800

[FLINK-30489][Connector/Pulsar] Shade all the dependencies in 
flink-sql-connector-pulsar. (#26)
---
 flink-sql-connector-pulsar/pom.xml | 25 +
 1 file changed, 25 insertions(+)

diff --git a/flink-sql-connector-pulsar/pom.xml 
b/flink-sql-connector-pulsar/pom.xml
index bec9d3d..eec5b3d 100644
--- a/flink-sql-connector-pulsar/pom.xml
+++ b/flink-sql-connector-pulsar/pom.xml
@@ -74,6 +74,31 @@ under the License.

org.slf4j:jul-to-slf4j


+   
+   
+   
org.apache.pulsar:pulsar-client-all
+   

+   
META-INF/versions/**/*
+   
io/swagger/**/*
+   

+   
+   
+   
org.bouncycastle:*
+   

+   
META-INF/versions/**/*
+   

+   
+   
+   
+   
+   
org.bouncycastle
+   
org.apache.pulsar.shade.org.bouncycastle
+   
+   
+   
com.scurrilous
+   
org.apache.pulsar.shade.com.scurrilous
+   
+   






[flink-connector-jdbc] 09/14: [FLINK-30790] Change Postgres tests to use new PostgresDatabase

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 97fa1144e788f25f9f2c321ea8d18bcbd45f41d6
Author: Joao Boto 
AuthorDate: Mon Jan 30 17:30:33 2023 +0100

[FLINK-30790] Change Postgres tests to use new PostgresDatabase
---
 .../jdbc/catalog/PostgresCatalogTestBase.java  | 26 ++---
 .../catalog/factory/JdbcCatalogFactoryTest.java| 26 ++---
 .../jdbc/databases/postgres/PostgresDatabase.java  | 46 ++-
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   | 65 +-
 4 files changed, 57 insertions(+), 106 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 2666bbc..bd3b982 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.postgres.PostgresDatabase;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -26,11 +26,6 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.PostgreSQLContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -38,17 +33,13 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 /** Test base for {@link PostgresCatalog}. */
-@Testcontainers
-class PostgresCatalogTestBase {
+class PostgresCatalogTestBase implements PostgresDatabase {
 
 public static final Logger LOG = 
LoggerFactory.getLogger(PostgresCatalogTestBase.class);
 
-protected static final DockerImageName POSTGRES_IMAGE =
-DockerImageName.parse(DockerImageVersions.POSTGRES);
-
 protected static final String TEST_CATALOG_NAME = "mypg";
-protected static final String TEST_USERNAME = "postgres";
-protected static final String TEST_PWD = "postgres";
+protected static final String TEST_USERNAME = CONTAINER.getUsername();
+protected static final String TEST_PWD = CONTAINER.getPassword();
 protected static final String TEST_DB = "test";
 protected static final String TEST_SCHEMA = "test_schema";
 protected static final String TABLE1 = "t1";
@@ -64,17 +55,10 @@ class PostgresCatalogTestBase {
 protected static String baseUrl;
 protected static PostgresCatalog catalog;
 
-@Container
-static final PostgreSQLContainer POSTGRES_CONTAINER =
-new PostgreSQLContainer<>(POSTGRES_IMAGE)
-.withUsername(TEST_USERNAME)
-.withPassword(TEST_PWD)
-.withLogConsumer(new Slf4jLogConsumer(LOG));
-
 @BeforeAll
 static void init() throws SQLException {
 // jdbc:postgresql://localhost:50807/postgres?user=postgres
-String jdbcUrl = POSTGRES_CONTAINER.getJdbcUrl();
+String jdbcUrl = CONTAINER.getJdbcUrl();
 // jdbc:postgresql://localhost:50807/
 baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
index e3aaefc..705e494 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.catalog.factory;
 
 import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
 import org.apache.flink.connector.jdbc.catalog.PostgresCatalog;
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.postgres.PostgresDatabase;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -29,11 +29,6 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

[flink-connector-jdbc] 06/14: [FLINK-30790] Change H2 and some Derby tests to new implementation

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit d2564a94f2da0a2dd803d87d89822f3bf251c54a
Author: Joao Boto 
AuthorDate: Mon Jan 30 16:52:28 2023 +0100

[FLINK-30790] Change H2 and some Derby tests to new implementation
---
 .../apache/flink/connector/jdbc/DbMetadata.java| 51 
 .../flink/connector/jdbc/JdbcDataTestBase.java |  5 +-
 .../apache/flink/connector/jdbc/JdbcITCase.java| 15 +++---
 .../flink/connector/jdbc/JdbcInputFormatTest.java  | 10 +---
 .../connector/jdbc/JdbcRowOutputFormatTest.java|  3 +-
 .../apache/flink/connector/jdbc/JdbcTestBase.java  | 12 ++---
 .../flink/connector/jdbc/JdbcTestFixture.java  | 30 
 .../connector/jdbc/databases/DatabaseMetadata.java | 39 +--
 .../jdbc/databases/derby/DerbyDatabase.java|  3 +-
 .../jdbc/databases/derby/DerbyMetadata.java|  1 -
 .../connector/jdbc/databases/h2/H2Metadata.java|  3 +-
 .../h2/xa}/H2XaConnectionWrapper.java  |  2 +-
 .../{xa/h2 => databases/h2/xa}/H2XaDsWrapper.java  |  2 +-
 .../h2/xa}/H2XaResourceWrapper.java|  2 +-
 .../{xa/h2 => databases/h2/xa}/package-info.java   |  2 +-
 .../jdbc/databases/mysql/MySqlMetadata.java|  2 -
 .../jdbc/databases/oracle/OracleMetadata.java  |  1 -
 .../jdbc/databases/postgres/PostgresMetadata.java  |  1 -
 .../databases/sqlserver/SqlServerMetadata.java |  1 -
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java |  4 +-
 .../oracle/OracleExactlyOnceSinkE2eTest.java   |  4 +-
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   |  4 +-
 .../connector/jdbc/internal/JdbcFullTest.java  | 15 +++---
 .../jdbc/internal/JdbcTableOutputFormatTest.java   | 12 ++---
 .../jdbc/table/JdbcAppendOnlyWriterTest.java   | 16 +++
 .../jdbc/table/JdbcDynamicTableSinkITCase.java | 15 ++
 .../connector/jdbc/table/JdbcOutputFormatTest.java |  3 +-
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java|  6 +--
 .../connector/jdbc/xa/JdbcXaFacadeImplTest.java|  8 ++--
 .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java |  8 ++--
 .../flink/connector/jdbc/xa/JdbcXaSinkH2Test.java  | 12 ++---
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 10 ++--
 .../jdbc/xa/JdbcXaSinkNoInsertionTest.java |  9 +---
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java  | 16 +++
 .../flink/connector/jdbc/xa/h2/H2DbMetadata.java   | 56 --
 35 files changed, 130 insertions(+), 253 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
deleted file mode 100644
index 55c5317..000
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc;
-
-import javax.sql.XADataSource;
-
-import java.io.Serializable;
-
-/** Describes a database: driver, schema and urls. */
-public interface DbMetadata extends Serializable {
-
-default String getInitUrl() {
-return getUrl();
-}
-
-String getUrl();
-
-default String getUser() {
-return "";
-}
-
-default String getPassword() {
-return "";
-}
-
-XADataSource buildXaDataSource();
-
-String getDriverClass();
-
-default JdbcConnectionOptions toConnectionOptions() {
-return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-.withDriverName(getDriverClass())
-.withUrl(getUrl())
-.build();
-}
-}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
index 003cf57..8b8a5aa 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
@@ -19,6 

[flink-connector-jdbc] 14/14: [FLINK-30790] Cleaning metadata deprecated methods and disableOnMac annotation

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 753ea0e87d05e0eb7b4419cd1653bd36ba986aa5
Author: Joao Boto 
AuthorDate: Wed Feb 8 12:05:08 2023 +0100

[FLINK-30790] Cleaning metadata deprecated methods and disableOnMac 
annotation
---
 .../flink/connector/jdbc/JdbcDataTestBase.java |  4 +--
 .../apache/flink/connector/jdbc/JdbcITCase.java|  8 +++---
 .../flink/connector/jdbc/JdbcInputFormatTest.java  | 30 +++---
 .../connector/jdbc/JdbcRowOutputFormatTest.java| 22 
 .../apache/flink/connector/jdbc/JdbcTestBase.java  |  2 +-
 .../flink/connector/jdbc/JdbcTestFixture.java  |  2 +-
 .../connector/jdbc/databases/DatabaseMetadata.java | 10 +---
 .../jdbc/databases/mysql/MySqlMetadata.java|  2 +-
 .../jdbc/databases/oracle/OracleDatabase.java  |  8 ++
 .../{OracleDatabase.java => OracleImages.java} | 25 ++
 .../jdbc/databases/oracle/OracleMetadata.java  | 10 ++--
 .../{OracleDatabase.java => OracleXaDatabase.java} |  7 ++---
 .../jdbc/databases/postgres/PostgresMetadata.java  |  2 +-
 .../databases/sqlserver/SqlServerMetadata.java |  3 ++-
 .../oracle/OracleExactlyOnceSinkE2eTest.java   | 14 ++
 .../jdbc/dialect/oracle/OracleTableSinkITCase.java |  3 ---
 .../dialect/oracle/OracleTableSourceITCase.java|  3 ---
 .../sqlserver/SqlServerTableSinkITCase.java|  2 +-
 .../connector/jdbc/internal/JdbcFullTest.java  |  8 +++---
 .../jdbc/internal/JdbcTableOutputFormatTest.java   |  8 +++---
 .../jdbc/table/JdbcAppendOnlyWriterTest.java   |  5 ++--
 .../jdbc/table/JdbcDynamicTableSinkITCase.java |  2 +-
 .../connector/jdbc/table/JdbcOutputFormatTest.java | 24 -
 .../jdbc/table/JdbcRowDataInputFormatTest.java | 24 -
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java|  7 +
 25 files changed, 87 insertions(+), 148 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
index 8b8a5aa..de23d88 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
@@ -35,8 +35,8 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB
 import static org.mockito.Mockito.doReturn;
 
 /**
- * Base class for JDBC test using data from {@link JdbcTestFixture}. It uses 
{@link DerbyDbMetadata}
- * and inserts data before each test.
+ * Base class for JDBC test using data from {@link JdbcTestFixture}. It uses 
{@link
+ * org.apache.flink.connector.jdbc.databases.derby.DerbyMetadata} and inserts 
data before each test.
  */
 public abstract class JdbcDataTestBase extends JdbcTestBase {
 @BeforeEach
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
index 41a8082..8c521dc 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
@@ -74,7 +74,7 @@ public class JdbcITCase extends JdbcTestBase {
 String.format(INSERT_TEMPLATE, INPUT_TABLE),
 TEST_ENTRY_JDBC_STATEMENT_BUILDER,
 new JdbcConnectionOptionsBuilder()
-.withUrl(getMetadata().getUrl())
+.withUrl(getMetadata().getJdbcUrl())
 
.withDriverName(getMetadata().getDriverClass())
 .build()));
 env.execute();
@@ -108,7 +108,7 @@ public class JdbcITCase extends JdbcTestBase {
 ps.setString(2, e.content);
 },
 new JdbcConnectionOptionsBuilder()
-.withUrl(getMetadata().getUrl())
+.withUrl(getMetadata().getJdbcUrl())
 
.withDriverName(getMetadata().getDriverClass())
 .build()));
 env.execute();
@@ -118,7 +118,7 @@ public class JdbcITCase extends JdbcTestBase {
 
 private List selectWords() throws SQLException {
 ArrayList strings = new ArrayList<>();
-try (Connection connection = 
DriverManager.getConnection(getMetadata().getUrl())) {
+try (Connection connection = 
DriverManager.getConnection(getMetadata().getJdbcUrl())) {
 try (Statement st 

[flink-connector-jdbc] 07/14: [FLINK-30790] Change Oracle tests to use new OracleDatabase

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit a4e200fb839b8ad7b82ab7aea7f62415d14d4a4e
Author: Joao Boto 
AuthorDate: Mon Jan 30 17:22:05 2023 +0100

[FLINK-30790] Change Oracle tests to use new OracleDatabase
---
 .../connector/jdbc/databases/DatabaseMetadata.java |  2 +
 .../jdbc/databases/derby/DerbyMetadata.java|  5 ++
 .../connector/jdbc/databases/h2/H2Metadata.java|  5 ++
 .../jdbc/databases/mysql/MySqlMetadata.java|  5 ++
 .../jdbc/databases/oracle/OracleDatabase.java  |  9 +-
 .../jdbc/databases/oracle/OracleMetadata.java  |  5 ++
 .../jdbc/databases/postgres/PostgresMetadata.java  |  5 ++
 .../databases/sqlserver/SqlServerMetadata.java |  5 ++
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java |  5 --
 .../jdbc/dialect/oracle/OracleContainer.java   | 99 --
 .../oracle/OracleExactlyOnceSinkE2eTest.java   | 25 ++
 .../jdbc/dialect/oracle/OracleTableSinkITCase.java | 66 +--
 .../dialect/oracle/OracleTableSourceITCase.java| 33 +---
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   |  5 --
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java|  6 +-
 15 files changed, 109 insertions(+), 171 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
index 2d3fbec..3eeed1d 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
@@ -39,6 +39,8 @@ public interface DatabaseMetadata extends Serializable {
 
 String getJdbcUrl();
 
+String getJdbcUrlWithCredentials();
+
 String getUsername();
 
 String getPassword();
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
index 33907db..960a56c 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
@@ -40,6 +40,11 @@ public class DerbyMetadata implements DatabaseMetadata {
 return String.format("jdbc:derby:%s", dbName);
 }
 
+@Override
+public String getJdbcUrlWithCredentials() {
+return getJdbcUrl();
+}
+
 @Override
 public String getUsername() {
 return "";
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
index d94e4d6..95db4c6 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
@@ -36,6 +36,11 @@ public class H2Metadata implements DatabaseMetadata {
 return String.format("jdbc:h2:mem:%s", schema);
 }
 
+@Override
+public String getJdbcUrlWithCredentials() {
+return getJdbcUrl();
+}
+
 @Override
 public String getUsername() {
 return "";
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
index 9125232..58956ad 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java
@@ -52,6 +52,11 @@ public class MySqlMetadata implements DatabaseMetadata {
 return this.url;
 }
 
+@Override
+public String getJdbcUrlWithCredentials() {
+return String.format("%s?user=%s=%s", this.url, 
this.username, this.password);
+}
+
 @Override
 public String getUsername() {
 return this.username;
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
index d800e38..70e503e 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
@@ -19,9 +19,10 @@ package org.apache.flink.connector.jdbc.databases.oracle;
 
 import 

[flink-connector-jdbc] 11/14: [FLINK-30790] Remove DockerImageVersions

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 12720ae4135048b1300440991fb3b5d10fc5d8f7
Author: Joao Boto 
AuthorDate: Tue Jan 31 16:45:34 2023 +0100

[FLINK-30790] Remove DockerImageVersions
---
 .../jdbc/catalog/MySql56CatalogITCase.java |  5 +--
 .../jdbc/catalog/MySql57CatalogITCase.java |  5 +--
 .../connector/jdbc/catalog/MySqlCatalogITCase.java |  4 +-
 .../jdbc/databases/mysql/MySqlDatabase.java|  8 ++--
 .../jdbc/databases/oracle/OracleDatabase.java  |  6 ++-
 .../jdbc/databases/postgres/PostgresDatabase.java  |  8 ++--
 .../databases/sqlserver/SqlServerDatabase.java | 11 +-
 .../sqlserver/SqlServerTableSinkITCase.java|  3 --
 .../sqlserver/SqlServerTableSourceITCase.java  |  3 --
 .../connector/jdbc/test/DockerImageVersions.java   | 45 --
 10 files changed, 28 insertions(+), 70 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
index 3a1c554..fb553c6 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
 
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.junit.jupiter.Container;
@@ -29,8 +29,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 public class MySql56CatalogITCase extends MySqlCatalogTestBase {
 
 @Container
-private static final MySQLContainer CONTAINER =
-createContainer(DockerImageVersions.MYSQL_5_6);
+private static final MySQLContainer CONTAINER = 
createContainer(MySqlDatabase.MYSQL_5_6);
 
 @Override
 protected String getDatabaseUrl() {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
index 350bea8..9d0edcf 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
 
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.junit.jupiter.Container;
@@ -29,8 +29,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 public class MySql57CatalogITCase extends MySqlCatalogTestBase {
 
 @Container
-private static final MySQLContainer CONTAINER =
-createContainer(DockerImageVersions.MYSQL_5_7);
+private static final MySQLContainer CONTAINER = 
createContainer(MySqlDatabase.MYSQL_5_7);
 
 @Override
 protected String getDatabaseUrl() {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
index 73b5acf..3480fa7 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
 
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.junit.jupiter.Container;
@@ -29,7 +29,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 public class MySqlCatalogITCase extends MySqlCatalogTestBase {
 
 @Container
-private static final MySQLContainer CONTAINER = 
createContainer(DockerImageVersions.MYSQL);
+private static final MySQLContainer CONTAINER = 
createContainer(MySqlDatabase.MYSQL_8_0);
 
 @Override
 protected String getDatabaseUrl() {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlDatabase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlDatabase.java
index f8e70a9..6e8c69e 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlDatabase.java

[flink-connector-jdbc] 13/14: [FLINK-30790] Small test fixings

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit cbb1c3538ee6b93e086eca7efffeac73279e98c9
Author: Joao Boto 
AuthorDate: Tue Feb 7 16:12:25 2023 +0100

[FLINK-30790] Small test fixings
---
 flink-connector-jdbc/pom.xml   |  2 +-
 .../flink/connector/jdbc/JdbcInputFormatTest.java  | 14 ---
 .../jdbc/catalog/MySqlCatalogTestBase.java | 18 -
 .../jdbc/databases/derby/DerbyDatabase.java|  2 +-
 .../connector/jdbc/databases/h2/H2XaDatabase.java  |  2 +-
 .../jdbc/databases/mysql/MySqlDatabase.java|  2 +-
 .../jdbc/databases/oracle/OracleDatabase.java  |  5 +--
 .../jdbc/databases/oracle/OracleMetadata.java  | 14 +--
 .../jdbc/databases/postgres/PostgresDatabase.java  |  2 +-
 .../databases/sqlserver/SqlServerDatabase.java |  2 +-
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 12 +-
 .../connector/jdbc/xa/JdbcXaFacadeTestHelper.java  | 44 +-
 .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java |  8 +++-
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 13 +--
 .../jdbc/xa/JdbcXaSinkNoInsertionTest.java |  3 +-
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java  | 21 ++-
 16 files changed, 66 insertions(+), 98 deletions(-)

diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index fa09d49..5fcdb41 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -39,7 +39,7 @@ under the License.
2.12.7
3.23.1
42.5.1
-   19.3.0.0
+   21.8.0.0
1.12.10

 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index ca6baa2..d766f07 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -29,8 +29,10 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 
 import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.ROW_TYPE_INFO;
@@ -180,8 +182,7 @@ class JdbcInputFormatTest extends JdbcDataTestBase {
 }
 
 @Test
-void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise()
-throws SQLException, ClassNotFoundException {
+void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws 
SQLException {
 jdbcInputFormat =
 JdbcInputFormat.buildJdbcInputFormat()
 .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -191,10 +192,11 @@ class JdbcInputFormatTest extends JdbcDataTestBase {
 .finish();
 jdbcInputFormat.openInputFormat();
 
-final int defaultFetchSize =
-
DERBY_EBOOKSHOP_DB.getConnection().createStatement().getFetchSize();
-
-
assertThat(jdbcInputFormat.getStatement().getFetchSize()).isEqualTo(defaultFetchSize);
+try (Connection dbConn = DERBY_EBOOKSHOP_DB.getConnection();
+Statement dbStatement = dbConn.createStatement();
+Statement inputStatement = jdbcInputFormat.getStatement()) {
+
assertThat(inputStatement.getFetchSize()).isEqualTo(dbStatement.getFetchSize());
+}
 }
 
 @Test
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index f8b9b07..e46b029 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -257,7 +257,7 @@ abstract class MySqlCatalogTestBase {
 }
 
 @Test
-void testGetDb_DatabaseNotExistException() throws Exception {
+void testGetDb_DatabaseNotExistException() {
 String databaseNotExist = "nonexistent";
 assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist))
 .satisfies(
@@ -275,7 +275,7 @@ abstract class MySqlCatalogTestBase {
 }
 
 @Test
-void testDbExists() throws Exception {
+void testDbExists() {
 String databaseNotExist = "nonexistent";
 assertThat(catalog.databaseExists(databaseNotExist)).isFalse();
 assertThat(catalog.databaseExists(TEST_DB)).isTrue();
@@ -296,7 +296,7 @@ abstract class MySqlCatalogTestBase {
 }
 

[flink-connector-jdbc] 08/14: [FLINK-30790] Change SqlServer tests to use new SqlServerDatabase

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 5623cb7cd76e697e5c5c5a271eadb5402622f181
Author: Joao Boto 
AuthorDate: Mon Jan 30 17:20:47 2023 +0100

[FLINK-30790] Change SqlServer tests to use new SqlServerDatabase
---
 .../sqlserver/SqlServerTableSinkITCase.java| 96 +++---
 .../sqlserver/SqlServerTableSourceITCase.java  | 32 
 2 files changed, 66 insertions(+), 62 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
index b40466b..42daf51 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.dialect.sqlserver;
 
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.databases.sqlserver.SqlServerDatabase;
 import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -48,9 +49,8 @@ import org.apache.flink.types.Row;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.MSSQLServerContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -69,15 +69,8 @@ import static org.apache.flink.table.api.Expressions.$;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 
 /** The Table Sink ITCase for {@link SqlServerDialect}. */
-@Testcontainers
-class SqlServerTableSinkITCase extends AbstractTestBase {
-
-@Container
-private static final MSSQLServerContainer container =
-new 
MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
-.acceptLicense();
-
-private static String containerUrl;
+@DisabledOnOs(OS.MAC)
+class SqlServerTableSinkITCase extends AbstractTestBase implements 
SqlServerDatabase {
 
 public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
 public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend";
@@ -88,14 +81,12 @@ class SqlServerTableSinkITCase extends AbstractTestBase {
 
 @BeforeAll
 static void beforeAll() throws ClassNotFoundException, SQLException {
-containerUrl =
-String.format(
-"%s;username=%s;password=%s",
-container.getJdbcUrl(), container.getUsername(), 
container.getPassword());
-Class.forName(container.getDriverClassName());
+Class.forName(CONTAINER.getDriverClassName());
 try (Connection conn =
 DriverManager.getConnection(
-containerUrl, container.getUsername(), 
container.getPassword());
+CONTAINER.getJdbcUrl(),
+CONTAINER.getUsername(),
+CONTAINER.getPassword());
 Statement stat = conn.createStatement()) {
 stat.executeUpdate(
 "CREATE TABLE "
@@ -143,10 +134,12 @@ class SqlServerTableSinkITCase extends AbstractTestBase {
 @AfterAll
 static void afterAll() throws Exception {
 TestValuesTableFactory.clearAllData();
-Class.forName(container.getDriverClassName());
+Class.forName(CONTAINER.getDriverClassName());
 try (Connection conn =
 DriverManager.getConnection(
-containerUrl, container.getUsername(), 
container.getPassword());
+CONTAINER.getJdbcUrl(),
+CONTAINER.getUsername(),
+CONTAINER.getPassword());
 Statement stat = conn.createStatement()) {
 stat.execute("DROP TABLE " + OUTPUT_TABLE1);
 stat.execute("DROP TABLE " + OUTPUT_TABLE2);
@@ -155,7 +148,6 @@ class SqlServerTableSinkITCase extends AbstractTestBase {
 stat.execute("DROP TABLE " + OUTPUT_TABLE5);
 stat.execute("DROP TABLE " + USER_TABLE);
 }
-container.stop();
 }
 
 public static DataStream> 
get4TupleDataStream(
@@ -207,21 

[flink-connector-jdbc] 10/14: [FLINK-30790] Change MySql tests to use new MySqlDatabase

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 0bc576ae52b965d3ef4d56b4a4b85459cbf7fa7c
Author: Joao Boto 
AuthorDate: Mon Jan 30 17:41:33 2023 +0100

[FLINK-30790] Change MySql tests to use new MySqlDatabase
---
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 194 +
 .../jdbc/table/UnsignedTypeConversionITCase.java   |  37 +---
 2 files changed, 7 insertions(+), 224 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
index 7100c2b..81d4690 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
@@ -1,47 +1,19 @@
 package org.apache.flink.connector.jdbc.dialect.mysql;
 
-import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
-import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata;
-import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
 import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import com.mysql.cj.jdbc.MysqlXADataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.MySQLContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.utility.DockerImageName;
 
 import javax.sql.XADataSource;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. Check for 
issues with errors on
  * closing connections.
  */
-@Testcontainers
-public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest {
-
-@Container
-private static final MySqlXaContainer CONTAINER =
-new MySqlXaContainer(DockerImageVersions.MYSQL)
-.withLockWaitTimeout(
-(CHECKPOINT_TIMEOUT_MS + 
TASK_CANCELLATION_TIMEOUT_MS) * 2);
-
-@Override
-public DatabaseMetadata getMetadata() {
-return new MySqlMetadata(CONTAINER);
-}
+public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest
+implements MySqlDatabase {
 
 @Override
 public SerializableSupplier getDataSourceSupplier() {
@@ -53,164 +25,4 @@ public class MySqlExactlyOnceSinkE2eTest extends 
JdbcExactlyOnceSinkE2eTest {
 return xaDataSource;
 };
 }
-
-/** {@link MySQLContainer} with XA enabled. */
-static class MySqlXaContainer extends MySQLContainer {
-private long lockWaitTimeout = 0;
-private volatile InnoDbStatusLogger innoDbStatusLogger;
-
-public MySqlXaContainer(String dockerImageName) {
-super(DockerImageName.parse(dockerImageName));
-}
-
-public MySqlXaContainer withLockWaitTimeout(long lockWaitTimeout) {
-checkArgument(lockWaitTimeout >= 0, "lockWaitTimeout should be 
greater than 0");
-this.lockWaitTimeout = lockWaitTimeout;
-return this.self();
-}
-
-@Override
-public void start() {
-super.start();
-// prevent XAER_RMERR: Fatal error occurred in the transaction  
branch - check your
-// data for consistency works for mysql v8+
-try (Connection connection =
-DriverManager.getConnection(getJdbcUrl(), "root", 
getPassword())) {
-prepareDb(connection, lockWaitTimeout);
-} catch (SQLException e) {
-ExceptionUtils.rethrow(e);
-}
-
-this.innoDbStatusLogger =
-new InnoDbStatusLogger(
-getJdbcUrl(), "root", getPassword(), 
lockWaitTimeout / 2);
-innoDbStatusLogger.start();
-}
-
-@Override
-public void stop() {
-try {
-innoDbStatusLogger.stop();
-} catch (Exception e) {
-ExceptionUtils.rethrow(e);
-} finally {
-super.stop();
-}
-}
-
-private void prepareDb(Connection connection, long lockWaitTimeout) 
throws SQLException {
-try (Statement st = connection.createStatement()) {
-st.execute("GRANT 

[flink-connector-jdbc] 12/14: [FLINK-30790] Fix JdbcExactlyOnceSinkE2eTest loop on failure

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 867fc0f29a1eb140af0201d65cfc57eaf447dad0
Author: Joao Boto 
AuthorDate: Tue Jan 31 17:52:50 2023 +0100

[FLINK-30790] Fix JdbcExactlyOnceSinkE2eTest loop on failure
---
 .../apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java | 2 +-
 .../org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java   | 2 +-
 .../flink/connector/jdbc/databases/oracle/OracleDatabase.java  | 5 +++--
 .../connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java  | 7 +++
 .../jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java  | 7 +++
 .../jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java  | 7 +++
 .../apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 2 +-
 7 files changed, 27 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
index 960a56c..087aa58 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java
@@ -23,7 +23,7 @@ import org.apache.derby.jdbc.EmbeddedXADataSource;
 
 import javax.sql.XADataSource;
 
-/** DerbyDbMetadata. */
+/** Derby Metadata. */
 public class DerbyMetadata implements DatabaseMetadata {
 private final String dbName;
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
index 95db4c6..ee41685 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.connector.jdbc.databases.h2.xa.H2XaDsWrapper;
 
 import javax.sql.XADataSource;
 
-/** H2DbMetadata. */
+/** H2 Metadata. */
 public class H2Metadata implements DatabaseMetadata {
 
 private final String schema;
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
index 13269d2..0ba1f0e 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java
@@ -29,14 +29,15 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 @Testcontainers
 public interface OracleDatabase extends DatabaseTest {
 
-String ORACLE_18 = "gvenzl/oracle-xe:18.4.0-slim-faststart";
+String ORACLE_18 = "gvenzl/oracle-xe:18.4.0-slim";
 String ORACLE_21 = "gvenzl/oracle-xe:21.3.0-slim-faststart";
 
 @Container
 JdbcDatabaseContainer CONTAINER =
 new OracleContainer(ORACLE_21)
 .withStartupTimeoutSeconds(240)
-.withConnectTimeoutSeconds(120);
+.withConnectTimeoutSeconds(120)
+.usingSid();
 
 @Override
 default DatabaseMetadata getMetadata() {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
index 81d4690..24fa8df 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
@@ -1,6 +1,8 @@
 package org.apache.flink.connector.jdbc.dialect.mysql;
 
+import org.apache.flink.connector.jdbc.databases.DatabaseMetadata;
 import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
+import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata;
 import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
 import org.apache.flink.util.function.SerializableSupplier;
 
@@ -15,6 +17,11 @@ import javax.sql.XADataSource;
 public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest
 implements MySqlDatabase {
 
+@Override
+public DatabaseMetadata getMetadata() {
+return new MySqlMetadata(CONTAINER, true);
+}
+
 @Override
 public SerializableSupplier getDataSourceSupplier() {
 return () -> {
diff --git 

[flink-connector-jdbc] branch main updated (ffee715 -> 753ea0e)

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


from ffee715  [FLINK-30963][ci] Streamline binary URL configuration
 new 45306bb  [FLINK-30790] Change log level to off on CI
 new 0246064  [FLINK-30790] Refactor JdbcDataTypeTest create tests by 
dialect
 new ba9dbc9  [FLINK-30790] Refactor JdbcExactlyOnceSinkE2eTest create 
tests by dialect
 new 7addee1  [FLINK-30790] Refactor MySqlCatalogITCase create tests by 
database version
 new b0a25a6  [FLINK-30790] Create unified databases for testing
 new d2564a9  [FLINK-30790] Change H2 and some Derby tests to new 
implementation
 new a4e200f  [FLINK-30790] Change Oracle tests to use new OracleDatabase
 new 5623cb7  [FLINK-30790] Change SqlServer tests to use new 
SqlServerDatabase
 new 97fa114  [FLINK-30790] Change Postgres tests to use new 
PostgresDatabase
 new 0bc576a  [FLINK-30790] Change MySql tests to use new MySqlDatabase
 new 12720ae  [FLINK-30790] Remove DockerImageVersions
 new 867fc0f  [FLINK-30790] Fix JdbcExactlyOnceSinkE2eTest loop on failure
 new cbb1c35  [FLINK-30790] Small test fixings
 new 753ea0e  [FLINK-30790] Cleaning metadata deprecated methods and 
disableOnMac annotation

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../6b9ab1b0-c14d-4667-bab5-407b81fba98b   |  12 +
 flink-connector-jdbc/pom.xml   |   2 +-
 .../flink/connector/jdbc/JdbcDataTestBase.java |   9 +-
 .../flink/connector/jdbc/JdbcDataTypeTest.java | 230 -
 .../apache/flink/connector/jdbc/JdbcITCase.java|  15 +-
 .../flink/connector/jdbc/JdbcInputFormatTest.java  |  52 +-
 .../connector/jdbc/JdbcRowOutputFormatTest.java|  25 +-
 .../apache/flink/connector/jdbc/JdbcTestBase.java  |  12 +-
 .../flink/connector/jdbc/JdbcTestFixture.java  |  31 +-
 ...ablePathTest.java => MySql56CatalogITCase.java} |  22 +-
 ...ablePathTest.java => MySql57CatalogITCase.java} |  22 +-
 .../connector/jdbc/catalog/MySqlCatalogITCase.java | 366 +-
 .../jdbc/catalog/MySqlCatalogTestBase.java | 396 +--
 .../jdbc/catalog/PostgresCatalogTestBase.java  |  26 +-
 .../catalog/factory/JdbcCatalogFactoryTest.java|  26 +-
 .../DatabaseMetadata.java} |  38 +-
 .../DatabaseTest.java} |  12 +-
 .../jdbc/databases/derby/DerbyDatabase.java|  51 ++
 .../derby/DerbyMetadata.java}  |  37 +-
 .../h2/H2Metadata.java}|  44 +-
 .../connector/jdbc/databases/h2/H2XaDatabase.java  |  50 ++
 .../h2/xa}/H2XaConnectionWrapper.java  |   2 +-
 .../{xa/h2 => databases/h2/xa}/H2XaDsWrapper.java  |   4 +-
 .../h2/xa}/H2XaResourceWrapper.java|   2 +-
 .../{xa/h2 => databases/h2/xa}/package-info.java   |   2 +-
 .../jdbc/databases/mysql/MySqlDatabase.java| 214 
 .../jdbc/databases/mysql/MySqlMetadata.java|  92 
 .../oracle/OracleDatabase.java}|  38 +-
 .../oracle/OracleImages.java}  |  12 +-
 .../jdbc/databases/oracle/OracleMetadata.java  |  97 
 .../oracle/OracleXaDatabase.java}  |  39 +-
 .../jdbc/databases/postgres/PostgresDatabase.java  |  85 
 .../jdbc/databases/postgres/PostgresMetadata.java  |  92 
 .../databases/sqlserver/SqlServerDatabase.java |  47 ++
 .../databases/sqlserver/SqlServerMetadata.java |  93 
 .../jdbc/dialect/JdbcDialectTypeTest.java  | 135 +
 .../jdbc/dialect/mysql/MySqlDialectTypeTest.java   |  65 +++
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java |  35 ++
 .../jdbc/dialect/oracle/OracleContainer.java   |  99 
 .../jdbc/dialect/oracle/OracleDialectTypeTest.java |  60 +++
 .../oracle/OracleExactlyOnceSinkE2eTest.java   |  31 ++
 .../jdbc/dialect/oracle/OracleTableSinkITCase.java |  63 ++-
 .../dialect/oracle/OracleTableSourceITCase.java|  30 +-
 .../dialect/postgres/PostgresDialectTypeTest.java  |  65 +++
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   |  35 ++
 .../sqlserver/SqlServerTableSinkITCase.java|  93 ++--
 .../sqlserver/SqlServerTableSourceITCase.java  |  29 +-
 .../connector/jdbc/internal/JdbcFullTest.java  |  15 +-
 .../jdbc/internal/JdbcTableOutputFormatTest.java   |  12 +-
 .../jdbc/table/JdbcAppendOnlyWriterTest.java   |  17 +-
 .../jdbc/table/JdbcDynamicTableSinkITCase.java |  15 +-
 .../connector/jdbc/table/JdbcOutputFormatTest.java |  27 +-
 .../jdbc/table/JdbcRowDataInputFormatTest.java |  24 +-
 .../jdbc/table/UnsignedTypeConversionITCase.java   |  37 +-
 

[flink-connector-jdbc] 04/14: [FLINK-30790] Refactor MySqlCatalogITCase create tests by database version

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 7addee18872af6333babbf6314fe073699dbd2f1
Author: Joao Boto 
AuthorDate: Mon Jan 30 11:38:55 2023 +0100

[FLINK-30790] Refactor MySqlCatalogITCase create tests by database version
---
 .../6b9ab1b0-c14d-4667-bab5-407b81fba98b   |  12 +
 .../jdbc/catalog/MySql56CatalogITCase.java |  39 ++
 .../jdbc/catalog/MySql57CatalogITCase.java |  39 ++
 .../connector/jdbc/catalog/MySqlCatalogITCase.java | 366 +--
 .../jdbc/catalog/MySqlCatalogTestBase.java | 396 ++---
 .../sqlserver/SqlServerTableSinkITCase.java|   3 -
 .../sqlserver/SqlServerTableSourceITCase.java  |   3 -
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java|  12 +-
 8 files changed, 459 insertions(+), 411 deletions(-)

diff --git 
a/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b 
b/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
index 9af409a..e54d3ff 100644
--- 
a/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
+++ 
b/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
@@ -8,6 +8,18 @@ org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase 
does not satisfy: onl
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+org.apache.flink.connector.jdbc.catalog.MySql57CatalogITCase does not satisfy: 
only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+org.apache.flink.connector.jdbc.catalog.MySql56CatalogITCase does not satisfy: 
only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
  or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
 org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase does not 
satisfy: only one of the following predicates match:\
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
new file mode 100644
index 000..2ff3ed0
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with 

[flink-connector-jdbc] 03/14: [FLINK-30790] Refactor JdbcExactlyOnceSinkE2eTest create tests by dialect

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit ba9dbc91a6dc19a49683dc6b558e46be61fd1c88
Author: Joao Boto 
AuthorDate: Mon Jan 30 15:01:22 2023 +0100

[FLINK-30790] Refactor JdbcExactlyOnceSinkE2eTest create tests by dialect
---
 .../apache/flink/connector/jdbc/DbMetadata.java|   4 +-
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 220 +
 .../jdbc/dialect/mysql/MySqlMetadata.java  |  82 
 .../oracle/OracleExactlyOnceSinkE2eTest.java   |  46 ++
 .../jdbc/dialect/oracle/OracleMetadata.java|  87 
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   |  91 
 .../jdbc/dialect/postgres/PostgresMetadata.java|  82 
 .../sqlserver/SqlServerTableSinkITCase.java|   3 +
 .../sqlserver/SqlServerTableSourceITCase.java  |   3 +
 .../connector/jdbc/test/DockerImageVersions.java   |  19 +-
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 541 ++---
 11 files changed, 671 insertions(+), 507 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
index 21f93d1..55c5317 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
@@ -24,7 +24,9 @@ import java.io.Serializable;
 /** Describes a database: driver, schema and urls. */
 public interface DbMetadata extends Serializable {
 
-String getInitUrl();
+default String getInitUrl() {
+return getUrl();
+}
 
 String getUrl();
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
new file mode 100644
index 000..1da2f7c
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java
@@ -0,0 +1,220 @@
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.apache.flink.connector.jdbc.DbMetadata;
+import org.apache.flink.connector.jdbc.test.DockerImageVersions;
+import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import com.mysql.cj.jdbc.MysqlXADataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.sql.XADataSource;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. Check for 
issues with errors on
+ * closing connections.
+ */
+@Testcontainers
+public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest {
+
+@Container
+private static final MySqlXaContainer CONTAINER =
+new MySqlXaContainer(DockerImageVersions.MYSQL)
+.withLockWaitTimeout(
+(CHECKPOINT_TIMEOUT_MS + 
TASK_CANCELLATION_TIMEOUT_MS) * 2);
+
+@Override
+protected String getDockerVersion() {
+return CONTAINER.getDockerImageName();
+}
+
+@Override
+protected DbMetadata getDbMetadata() {
+return new MySqlMetadata(CONTAINER);
+}
+
+@Override
+public SerializableSupplier getDataSourceSupplier() {
+return () -> {
+MysqlXADataSource xaDataSource = new MysqlXADataSource();
+xaDataSource.setUrl(CONTAINER.getJdbcUrl());
+xaDataSource.setUser(CONTAINER.getUsername());
+xaDataSource.setPassword(CONTAINER.getPassword());
+return xaDataSource;
+};
+}
+
+/** {@link MySQLContainer} with XA enabled. */
+static class MySqlXaContainer extends MySQLContainer {
+private long lockWaitTimeout = 0;
+private volatile InnoDbStatusLogger innoDbStatusLogger;
+
+public MySqlXaContainer(String dockerImageName) {
+super(DockerImageName.parse(dockerImageName));
+}
+
+public MySqlXaContainer withLockWaitTimeout(long lockWaitTimeout) {
+checkArgument(lockWaitTimeout >= 0, "lockWaitTimeout should be 
greater than 0");
+this.lockWaitTimeout = lockWaitTimeout;
+return this.self();
+}
+
+@Override
+public void start() {
+ 

[flink-connector-jdbc] 02/14: [FLINK-30790] Refactor JdbcDataTypeTest create tests by dialect

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 0246064b1262d6e8889dfdc96a1ea1e6a6a640aa
Author: Joao Boto 
AuthorDate: Mon Jan 30 10:30:08 2023 +0100

[FLINK-30790] Refactor JdbcDataTypeTest create tests by dialect
---
 .../flink/connector/jdbc/JdbcDataTypeTest.java | 230 -
 .../jdbc/dialect/JdbcDialectTypeTest.java  | 135 
 .../jdbc/dialect/mysql/MySqlDialectTypeTest.java   |  65 ++
 .../jdbc/dialect/oracle/OracleDialectTypeTest.java |  60 ++
 .../dialect/postgres/PostgresDialectTypeTest.java  |  65 ++
 5 files changed, 325 insertions(+), 230 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
deleted file mode 100644
index 75be19a..000
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.jdbc;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for all DataTypes and Dialects of JDBC connector. */
-@ExtendWith(ParameterizedTestExtension.class)
-public class JdbcDataTypeTest {
-
-private static final String DDL_FORMAT =
-"CREATE TABLE T(\n"
-+ "f0 %s\n"
-+ ") WITH (\n"
-+ "  'connector'='jdbc',\n"
-+ "  'url'='"
-+ "jdbc:%s:memory:test"
-+ "',\n"
-+ "  'table-name'='myTable'\n"
-+ ")";
-
-@Parameters(name = "{0}")
-public static List testData() {
-return Arrays.asList(
-createTestItem("derby", "CHAR"),
-createTestItem("derby", "VARCHAR"),
-createTestItem("derby", "BOOLEAN"),
-createTestItem("derby", "TINYINT"),
-createTestItem("derby", "SMALLINT"),
-createTestItem("derby", "INTEGER"),
-createTestItem("derby", "BIGINT"),
-createTestItem("derby", "FLOAT"),
-createTestItem("derby", "DOUBLE"),
-createTestItem("derby", "DECIMAL(10, 4)"),
-createTestItem("derby", "DATE"),
-createTestItem("derby", "TIME"),
-createTestItem("derby", "TIMESTAMP(3)"),
-createTestItem("derby", "TIMESTAMP WITHOUT TIME ZONE"),
-createTestItem("derby", "TIMESTAMP(9) WITHOUT TIME ZONE"),
-createTestItem("derby", "VARBINARY"),
-createTestItem("mysql", "CHAR"),
-createTestItem("mysql", "VARCHAR"),
-createTestItem("mysql", "BOOLEAN"),
-createTestItem("mysql", "TINYINT"),
-createTestItem("mysql", "SMALLINT"),
-createTestItem("mysql", "INTEGER"),
-createTestItem("mysql", "BIGINT"),
-createTestItem("mysql", "FLOAT"),
-createTestItem("mysql", "DOUBLE"),
-createTestItem("mysql", "DECIMAL(10, 4)"),
-createTestItem("mysql", "DECIMAL(38, 18)"),
-createTestItem("mysql", "DATE"),
-

[flink-connector-jdbc] 05/14: [FLINK-30790] Create unified databases for testing

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit b0a25a688679595b0f2d128fb4b88758ce8629da
Author: Joao Boto 
AuthorDate: Thu Feb 2 09:12:37 2023 +0100

[FLINK-30790] Create unified databases for testing
---
 .../flink/connector/jdbc/JdbcTestFixture.java  |  5 +-
 .../jdbc/catalog/MySql56CatalogITCase.java |  2 +-
 .../jdbc/catalog/MySql57CatalogITCase.java |  2 +-
 .../connector/jdbc/databases/DatabaseMetadata.java | 40 ++
 .../connector/jdbc/databases/DatabaseTest.java | 24 +
 .../jdbc/databases/derby/DerbyDatabase.java| 50 +
 .../derby/DerbyMetadata.java}  | 31 +++
 .../h2/H2Metadata.java}| 47 
 .../connector/jdbc/databases/h2/H2XaDatabase.java  | 50 +
 .../mysql/MySqlDatabase.java}  | 62 ++
 .../mysql/MySqlMetadata.java   | 37 +++--
 .../jdbc/databases/oracle/OracleDatabase.java  | 38 +
 .../oracle/OracleMetadata.java | 36 +++--
 .../jdbc/databases/postgres/PostgresDatabase.java  | 43 +++
 .../postgres/PostgresMetadata.java | 34 +++-
 .../databases/sqlserver/SqlServerDatabase.java | 40 ++
 .../sqlserver/SqlServerMetadata.java}  | 38 +++--
 .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java |  1 +
 .../oracle/OracleExactlyOnceSinkE2eTest.java   |  1 +
 .../postgres/PostgresExactlyOnceSinkE2eTest.java   |  1 +
 .../flink/connector/jdbc/xa/h2/H2XaDsWrapper.java  |  2 +-
 21 files changed, 452 insertions(+), 132 deletions(-)

diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
index e9b2953..d00fbad 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.jdbc.databases.derby.DerbyMetadata;
 import org.apache.flink.connector.jdbc.xa.h2.H2DbMetadata;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -74,8 +75,8 @@ public class JdbcTestFixture {
 };
 
 private static final String EBOOKSHOP_SCHEMA_NAME = "ebookshop";
-public static final DerbyDbMetadata DERBY_EBOOKSHOP_DB =
-new DerbyDbMetadata(EBOOKSHOP_SCHEMA_NAME);
+public static final DerbyMetadata DERBY_EBOOKSHOP_DB =
+new DerbyMetadata(EBOOKSHOP_SCHEMA_NAME);
 public static final H2DbMetadata H2_EBOOKSHOP_DB = new 
H2DbMetadata(EBOOKSHOP_SCHEMA_NAME);
 
 /** TestEntry. */
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
index 2ff3ed0..3a1c554 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java
@@ -24,7 +24,7 @@ import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
-/** E2E test for {@link MySqlCatalog}. */
+/** E2E test for {@link MySqlCatalog} with MySql version 5.6. */
 @Testcontainers
 public class MySql56CatalogITCase extends MySqlCatalogTestBase {
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
index 0a1dc8b..350bea8 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java
@@ -24,7 +24,7 @@ import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
-/** E2E test for {@link MySqlCatalog}. */
+/** E2E test for {@link MySqlCatalog} with MySql version 5.7. */
 @Testcontainers
 public class MySql57CatalogITCase extends MySqlCatalogTestBase {
 
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
 

[flink-connector-jdbc] 01/14: [FLINK-30790] Change log level to off on CI

2023-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 45306bbdeb8f056b6c3cab89e7ee27a573672025
Author: Joao Boto 
AuthorDate: Sat Jan 28 21:54:38 2023 +0100

[FLINK-30790] Change log level to off on CI
---
 tools/ci/log4j.properties | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties
index 7daf1c3..6bea562 100644
--- a/tools/ci/log4j.properties
+++ b/tools/ci/log4j.properties
@@ -16,7 +16,9 @@
 # limitations under the License.
 

 
-rootLogger.level = INFO
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
 rootLogger.appenderRef.out.ref = ConsoleAppender
 
 # -



[flink-table-store] branch master updated: [FLINK-31038] Avoid accessing non-TableStore tables in HiveCatalog.listTables

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new bd6036cc [FLINK-31038] Avoid accessing non-TableStore tables in 
HiveCatalog.listTables
bd6036cc is described below

commit bd6036cc1f9cc853e04f45dac7b65b66ffdb669f
Author: Jingsong Lee 
AuthorDate: Mon Feb 13 20:16:40 2023 +0800

[FLINK-31038] Avoid accessing non-TableStore tables in 
HiveCatalog.listTables

This closes #528
---
 .../org/apache/flink/table/store/hive/HiveCatalog.java| 15 ---
 .../apache/flink/table/store/hive/HiveCatalogITCase.java  | 13 +
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 1b5deb88..90f0f9e2 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -157,9 +157,14 @@ public class HiveCatalog extends AbstractCatalog {
 try {
 return client.getAllTables(databaseName).stream()
 .filter(
-tableName ->
-tableStoreTableExists(
-new Identifier(databaseName, 
tableName), false))
+tableName -> {
+Identifier identifier = new 
Identifier(databaseName, tableName);
+// the environment here may not be able to 
access non-TableStore
+// tables.
+// so we just check the schema file first
+return schemaFileExists(identifier)
+&& tableStoreTableExists(identifier, 
false);
+})
 .collect(Collectors.toList());
 } catch (UnknownDBException e) {
 throw new DatabaseNotExistException(databaseName, e);
@@ -396,6 +401,10 @@ public class HiveCatalog extends AbstractCatalog {
 return tableStoreTableExists(identifier, true);
 }
 
+private boolean schemaFileExists(Identifier identifier) {
+return new SchemaManager(fileIO, 
getTableLocation(identifier)).latest().isPresent();
+}
+
 private boolean tableStoreTableExists(Identifier identifier, boolean 
throwException) {
 Table table;
 try {
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index f51a081c..bce1a980 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.connector.FlinkCatalog;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
@@ -372,6 +373,18 @@ public class HiveCatalogITCase {
 String.format("Field names %s cannot contain upper 
case", "[A, C]"));
 }
 
+@Test
+public void testQuickPathInShowTables() throws Exception {
+collect("CREATE TABLE t ( a INT, b STRING )");
+List tables = collect("SHOW TABLES");
+Assert.assertEquals("[+I[t]]", tables.toString());
+
+new LocalFileIO()
+.delete(new org.apache.flink.table.store.fs.Path(path, 
"test_db.db/t"), true);
+tables = collect("SHOW TABLES");
+Assert.assertEquals("[]", tables.toString());
+}
+
 private List collect(String sql) throws Exception {
 List result = new ArrayList<>();
 try (CloseableIterator it = tEnv.executeSql(sql).collect()) {



[flink-table-store] branch master updated (1796d3fa -> a9b33ad5)

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


from 1796d3fa [FLINK-31023] Introduce ConfigOption for table store
 add a9b33ad5 [FLINK-31024] Copy code splitter to table store from flink 
table

No new revisions were added by this update.

Summary of changes:
 flink-table-store-common/pom.xml   |  51 ++
 .../src/main/antlr4/JavaLexer.g4   | 207 +++
 .../src/main/antlr4/JavaParser.g4  | 635 +
 .../flink/table/store/codegen/GeneratedClass.java  |   2 +-
 .../codesplit/AddBoolBeforeReturnRewriter.java | 183 ++
 .../codegen/codesplit/BlockStatementGrouper.java   | 476 +++
 .../codegen/codesplit/BlockStatementRewriter.java  | 251 
 .../codegen/codesplit/BlockStatementSplitter.java  | 276 +
 .../codesplit/CodeRewriter.java}   |  24 +-
 .../store/codegen/codesplit/CodeSplitUtil.java |  51 ++
 .../codegen/codesplit/DeclarationRewriter.java | 269 +
 .../store/codegen/codesplit/FunctionSplitter.java  | 270 +
 .../store/codegen/codesplit/JavaCodeSplitter.java  |  57 ++
 .../codegen/codesplit/MemberFieldRewriter.java | 349 +++
 .../codegen/codesplit/ReturnAndJumpCounter.java|  38 ++
 .../codegen/codesplit/ReturnValueRewriter.java | 194 +++
 .../src/main/resources/META-INF/NOTICE |  16 +
 .../licenses/LICENSE.antlr-java-grammar-files  |  26 +
 .../META-INF/licenses/LICENSE.antlr-runtime|  26 +
 .../AddBooleanBeforeReturnRewriterTest.java|  74 +++
 .../codesplit/BlockStatementGrouperTest.java   | 165 ++
 .../codesplit/BlockStatementRewriterTest.java  |  94 +++
 .../codesplit/BlockStatementSplitterTest.java  |  77 +++
 .../codegen/codesplit/CodeRewriterTestBase.java|  83 +++
 .../store/codegen/codesplit/CodeSplitTestUtil.java | 124 
 .../codegen/codesplit/DeclarationRewriterTest.java |  59 ++
 .../codegen/codesplit/FunctionSplitterTest.java|  48 ++
 .../codegen/codesplit/JavaCodeSplitterTest.java| 121 
 .../store/codegen/codesplit/JavaParserTest.java|  66 +++
 .../codegen/codesplit/MemberFieldRewriterTest.java |  58 ++
 .../codegen/codesplit/ReturnValueRewriterTest.java |  48 ++
 .../code/TestAddBooleanBeforeReturn.java   |  19 +
 .../codesplit/add-boolean/code/TestNotRewrite.java |  18 +
 .../add-boolean/code/TestRewriteInnerClass.java|  32 ++
 .../code/TestSkipAnonymousClassAndLambda.java  |  27 +
 .../expected/TestAddBooleanBeforeReturn.java   |  21 +
 .../add-boolean/expected/TestNotRewrite.java   |  18 +
 .../expected/TestRewriteInnerClass.java|  35 ++
 .../expected/TestSkipAnonymousClassAndLambda.java  |  28 +
 .../block/code/TestIfInsideWhileLoopRewrite.java   | 100 
 .../TestIfMultipleSingleLineStatementRewrite.java  |  22 +
 .../block/code/TestIfStatementRewrite.java |  44 ++
 .../block/code/TestIfStatementRewrite1.java|  34 ++
 .../block/code/TestIfStatementRewrite2.java|  32 ++
 .../block/code/TestIfStatementRewrite3.java|  21 +
 ...ewriteIfStatementInFunctionWithReturnValue.java |  23 +
 .../block/code/TestRewriteInnerClass.java  |  23 +
 .../block/code/TestRewriteTwoStatements.java   |  69 +++
 .../block/code/TestWhileLoopInsideIfRewrite.java   |  46 ++
 .../codesplit/block/code/TestWhileLoopRewrite.java |  36 ++
 .../expected/TestIfInsideWhileLoopRewrite.java | 134 +
 .../TestIfMultipleSingleLineStatementRewrite.java  |  48 ++
 .../block/expected/TestIfStatementRewrite.java |  68 +++
 .../block/expected/TestIfStatementRewrite1.java|  80 +++
 .../block/expected/TestIfStatementRewrite2.java|  66 +++
 .../block/expected/TestIfStatementRewrite3.java|  43 ++
 ...ewriteIfStatementInFunctionWithReturnValue.java |  23 +
 .../block/expected/TestRewriteInnerClass.java  |  43 ++
 .../block/expected/TestRewriteTwoStatements.java   | 129 +
 .../expected/TestWhileLoopInsideIfRewrite.java |  80 +++
 .../block/expected/TestWhileLoopRewrite.java   |  62 ++
 ...LocalVariableAndMemberVariableWithSameName.java |  12 +
 .../code/TestLocalVariableWithSameName.java|  15 +
 ...riteLocalVariableInFunctionWithReturnValue.java |   8 +
 .../declaration/code/TestRewriteInnerClass.java|  17 +
 .../declaration/code/TestRewriteLocalVariable.java |   8 +
 .../code/TestRewriteLocalVariableInForLoop1.java   |   9 +
 .../code/TestRewriteLocalVariableInForLoop2.java   |   9 +
 ...LocalVariableAndMemberVariableWithSameName.java |  16 +
 .../expected/TestLocalVariableWithSameName.java|  22 +
 ...riteLocalVariableInFunctionWithReturnValue.java |   0
 .../expected/TestRewriteInnerClass.java|  25 +
 .../expected/TestRewriteLocalVariable.java |  12 +
 .../TestRewriteLocalVariableInForLoop1.java|  

[flink-table-store] branch master updated: [FLINK-31008] Fix the bug that ContinuousFileSplitEnumerator may be out of order when allocating splits

2023-02-13 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new e9875b4d [FLINK-31008] Fix the bug that ContinuousFileSplitEnumerator 
may be out of order when allocating splits
e9875b4d is described below

commit e9875b4d0bb9d3ccd05b51349ffe7a1ea78510e0
Author: liming30 <6950+limin...@users.noreply.github.com>
AuthorDate: Mon Feb 13 19:30:50 2023 +0800

[FLINK-31008] Fix the bug that ContinuousFileSplitEnumerator may be out of 
order when allocating splits

This closes #519
---
 .../source/ContinuousFileSplitEnumerator.java  |  15 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 154 +
 2 files changed, 166 insertions(+), 3 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 2524c30a..a33fac87 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -38,7 +38,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 
 import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
@@ -52,7 +51,7 @@ public class ContinuousFileSplitEnumerator
 
 private final SplitEnumeratorContext context;
 
-private final Map> bucketSplits;
+private final Map> bucketSplits;
 
 private Long nextSnapshotId;
 
@@ -91,6 +90,16 @@ public class ContinuousFileSplitEnumerator
 .add(split);
 }
 
+private void addSplitsBack(Collection splits) {
+new 
LinkedList<>(splits).descendingIterator().forEachRemaining(this::addSplitToHead);
+}
+
+private void addSplitToHead(FileStoreSourceSplit split) {
+bucketSplits
+.computeIfAbsent(((DataSplit) split.split()).bucket(), i -> 
new LinkedList<>())
+.addFirst(split);
+}
+
 @Override
 public void start() {
 context.callAsync(
@@ -121,7 +130,7 @@ public class ContinuousFileSplitEnumerator
 @Override
 public void addSplitsBack(List splits, int 
subtaskId) {
 LOG.debug("File Source Enumerator adds splits back: {}", splits);
-addSplits(splits);
+addSplitsBack(splits);
 }
 
 @Override
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
new file mode 100644
index ..dfcf0c30
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
+
+/** Unit tests 

[flink] branch release-1.16 updated (f0e0069a741 -> 700f8839126)

2023-02-13 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from f0e0069a741 [FLINK-31031][python] Disable the output buffer of Python 
process
 add 700f8839126 [FLINK-28440][state/changelog] record reference count of 
StreamStateHandle in TaskChangelogRegistry

No new revisions were added by this update.

Summary of changes:
 .../fs/AbstractStateChangeFsUploader.java  |  19 +--
 .../flink/changelog/fs/FsStateChangelogWriter.java |   8 +-
 .../flink/changelog/fs/TaskChangelogRegistry.java  |  28 ++---
 .../changelog/fs/TaskChangelogRegistryImpl.java|  46 ---
 .../fs/DiscardRecordableStateChangeUploader.java   |  73 +++
 .../changelog/fs/FsStateChangelogWriterTest.java   | 140 +
 .../fs/TaskChangelogRegistryImplTest.java  |  22 ++--
 .../fs/TestingBatchingUploadScheduler.java |  41 +++---
 .../state/changelog/ChangelogStateDiscardTest.java |  39 +++---
 9 files changed, 322 insertions(+), 94 deletions(-)
 create mode 100644 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
 copy 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java
 => 
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingBatchingUploadScheduler.java
 (53%)



[flink] branch release-1.17 updated: [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry

2023-02-13 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new c612575ed33 [FLINK-28440][state/changelog] record reference count of 
StreamStateHandle in TaskChangelogRegistry
c612575ed33 is described below

commit c612575ed339b319d9822dd7cbc59e3d972fe5ed
Author: wangfeifan 
AuthorDate: Wed Feb 1 00:04:50 2023 +0800

[FLINK-28440][state/changelog] record reference count of StreamStateHandle 
in TaskChangelogRegistry

Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com>
---
 .../fs/AbstractStateChangeFsUploader.java  |  19 +--
 .../flink/changelog/fs/FsStateChangelogWriter.java |   8 +-
 .../flink/changelog/fs/TaskChangelogRegistry.java  |  28 ++---
 .../changelog/fs/TaskChangelogRegistryImpl.java|  46 ---
 .../fs/DiscardRecordableStateChangeUploader.java   |  73 +++
 .../changelog/fs/FsStateChangelogWriterTest.java   | 140 +
 .../fs/TaskChangelogRegistryImplTest.java  |  22 ++--
 .../fs/TestingBatchingUploadScheduler.java |  52 
 .../state/changelog/ChangelogStateDiscardTest.java |  39 +++---
 9 files changed, 352 insertions(+), 75 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
index 3dd3dfcc70f..55bdfeac179 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 
 /** Base implementation of StateChangeUploader. */
 public abstract class AbstractStateChangeFsUploader implements 
StateChangeUploader {
@@ -80,22 +79,16 @@ public abstract class AbstractStateChangeFsUploader 
implements StateChangeUpload
 for (UploadTask task : tasks) {
 tasksOffsets.put(task, format.write(stream, task.changeSets));
 }
+
+long numOfChangeSets = tasks.stream().flatMap(t -> 
t.getChangeSets().stream()).count();
+
 StreamStateHandle handle = stream.getHandle(handleFactory);
-changelogRegistry.startTracking(
-handle,
-tasks.stream()
-.flatMap(t -> t.getChangeSets().stream())
-.map(StateChangeSet::getLogId)
-.collect(Collectors.toSet()));
+changelogRegistry.startTracking(handle, numOfChangeSets);
+
 if (stream instanceof DuplicatingOutputStreamWithPos) {
 StreamStateHandle localHandle =
 ((DuplicatingOutputStreamWithPos) 
stream).getSecondaryHandle(handleFactory);
-changelogRegistry.startTracking(
-localHandle,
-tasks.stream()
-.flatMap(t -> t.getChangeSets().stream())
-.map(StateChangeSet::getLogId)
-.collect(Collectors.toSet()));
+changelogRegistry.startTracking(localHandle, numOfChangeSets);
 return new UploadTasksResult(tasksOffsets, handle, 
localHandle);
 }
 // WARN: streams have to be closed before returning the results
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index f72c50a9d88..83a4b2d5861 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -288,9 +288,9 @@ class FsStateChangelogWriter implements 
StateChangelogWriter 
notUsedState) {
 LOG.trace("Uploaded state to discard: {}", notUsedState);
 for (UploadResult result : notUsedState.values()) {
-changelogRegistry.notUsed(result.streamStateHandle, logId);
+changelogRegistry.release(result.streamStateHandle);
 if (result.localStreamHandle != null) {
-changelogRegistry.notUsed(result.localStreamHandle, logId);
+changelogRegistry.release(result.localStreamHandle);
 }
 }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/TaskChangelogRegistry.java
 

[flink] branch master updated: [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry

2023-02-13 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c0aa73df4df [FLINK-28440][state/changelog] record reference count of 
StreamStateHandle in TaskChangelogRegistry
c0aa73df4df is described below

commit c0aa73df4df4e39c138f2cddaeb8efad6c831d03
Author: wangfeifan 
AuthorDate: Wed Feb 1 00:04:50 2023 +0800

[FLINK-28440][state/changelog] record reference count of StreamStateHandle 
in TaskChangelogRegistry

Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com>
---
 .../fs/AbstractStateChangeFsUploader.java  |  19 +--
 .../flink/changelog/fs/FsStateChangelogWriter.java |   8 +-
 .../flink/changelog/fs/TaskChangelogRegistry.java  |  28 ++---
 .../changelog/fs/TaskChangelogRegistryImpl.java|  46 ---
 .../fs/DiscardRecordableStateChangeUploader.java   |  73 +++
 .../changelog/fs/FsStateChangelogWriterTest.java   | 140 +
 .../fs/TaskChangelogRegistryImplTest.java  |  22 ++--
 .../fs/TestingBatchingUploadScheduler.java |  52 
 .../state/changelog/ChangelogStateDiscardTest.java |  39 +++---
 9 files changed, 352 insertions(+), 75 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
index 3dd3dfcc70f..55bdfeac179 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 
 /** Base implementation of StateChangeUploader. */
 public abstract class AbstractStateChangeFsUploader implements 
StateChangeUploader {
@@ -80,22 +79,16 @@ public abstract class AbstractStateChangeFsUploader 
implements StateChangeUpload
 for (UploadTask task : tasks) {
 tasksOffsets.put(task, format.write(stream, task.changeSets));
 }
+
+long numOfChangeSets = tasks.stream().flatMap(t -> 
t.getChangeSets().stream()).count();
+
 StreamStateHandle handle = stream.getHandle(handleFactory);
-changelogRegistry.startTracking(
-handle,
-tasks.stream()
-.flatMap(t -> t.getChangeSets().stream())
-.map(StateChangeSet::getLogId)
-.collect(Collectors.toSet()));
+changelogRegistry.startTracking(handle, numOfChangeSets);
+
 if (stream instanceof DuplicatingOutputStreamWithPos) {
 StreamStateHandle localHandle =
 ((DuplicatingOutputStreamWithPos) 
stream).getSecondaryHandle(handleFactory);
-changelogRegistry.startTracking(
-localHandle,
-tasks.stream()
-.flatMap(t -> t.getChangeSets().stream())
-.map(StateChangeSet::getLogId)
-.collect(Collectors.toSet()));
+changelogRegistry.startTracking(localHandle, numOfChangeSets);
 return new UploadTasksResult(tasksOffsets, handle, 
localHandle);
 }
 // WARN: streams have to be closed before returning the results
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index f72c50a9d88..83a4b2d5861 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -288,9 +288,9 @@ class FsStateChangelogWriter implements 
StateChangelogWriter 
notUsedState) {
 LOG.trace("Uploaded state to discard: {}", notUsedState);
 for (UploadResult result : notUsedState.values()) {
-changelogRegistry.notUsed(result.streamStateHandle, logId);
+changelogRegistry.release(result.streamStateHandle);
 if (result.localStreamHandle != null) {
-changelogRegistry.notUsed(result.localStreamHandle, logId);
+changelogRegistry.release(result.localStreamHandle);
 }
 }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/TaskChangelogRegistry.java
 

[flink] branch release-1.15 updated: [FLINK-31031][python] Disable the output buffer of Python process

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
 new 2fa360ae387 [FLINK-31031][python] Disable the output buffer of Python 
process
2fa360ae387 is described below

commit 2fa360ae387215455b536633808d6a82ca1e17ca
Author: Dian Fu 
AuthorDate: Mon Feb 13 16:22:56 2023 +0800

[FLINK-31031][python] Disable the output buffer of Python process
---
 .../org/apache/flink/client/python/PythonDriver.java |  2 ++
 .../apache/flink/client/python/PythonDriverTest.java | 20 +++-
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
index 57b1eb2f32d..ed439415418 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -154,6 +154,8 @@ public final class PythonDriver {
  */
 static List constructPythonCommands(final PythonDriverOptions 
pythonDriverOptions) {
 final List commands = new ArrayList<>();
+// disable output buffer
+commands.add("-u");
 if (pythonDriverOptions.getEntryPointScript().isPresent()) {
 commands.add(pythonDriverOptions.getEntryPointScript().get());
 } else {
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
index 601c68fe16b..1b14bd8b379 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
@@ -52,11 +52,12 @@ public class PythonDriverTest {
 PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions("xxx", null, args);
 List commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
 // verify the generated commands
-Assert.assertEquals(4, commands.size());
-Assert.assertEquals(commands.get(0), "-m");
-Assert.assertEquals(commands.get(1), "xxx");
-Assert.assertEquals(commands.get(2), "--input");
-Assert.assertEquals(commands.get(3), "in.txt");
+Assert.assertEquals(5, commands.size());
+Assert.assertEquals(commands.get(0), "-u");
+Assert.assertEquals(commands.get(1), "-m");
+Assert.assertEquals(commands.get(2), "xxx");
+Assert.assertEquals(commands.get(3), "--input");
+Assert.assertEquals(commands.get(4), "in.txt");
 }
 
 @Test
@@ -67,9 +68,10 @@ public class PythonDriverTest {
 
 PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions(null, "xxx.py", args);
 List commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
-Assert.assertEquals(3, commands.size());
-Assert.assertEquals(commands.get(0), "xxx.py");
-Assert.assertEquals(commands.get(1), "--input");
-Assert.assertEquals(commands.get(2), "in.txt");
+Assert.assertEquals(4, commands.size());
+Assert.assertEquals(commands.get(0), "-u");
+Assert.assertEquals(commands.get(1), "xxx.py");
+Assert.assertEquals(commands.get(2), "--input");
+Assert.assertEquals(commands.get(3), "in.txt");
 }
 }



[flink-web] 02/02: Rebuild website.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit ffaf15279d4d1e73e439248e1338623decaaa762
Author: Weijie Guo 
AuthorDate: Mon Feb 13 16:36:17 2023 +0800

Rebuild website.
---
 content/community.html| 18 --
 content/zh/community.html | 18 --
 2 files changed, 24 insertions(+), 12 deletions(-)

diff --git a/content/community.html b/content/community.html
index bacb8a38a..aac814165 100644
--- a/content/community.html
+++ b/content/community.html
@@ -647,6 +647,18 @@ Thanks,
 PMC, Committer
 gates
   
+  
+https://avatars3.githubusercontent.com/u/19502505?s=50; 
class="committer-avatar" />
+Weijie Guo
+Committer
+guoweijie
+  
+  
+https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar" />
+Yangze Guo
+Committer
+guoyangze
+  
   
 https://avatars1.githubusercontent.com/u/569655?s=50; 
class="committer-avatar" />
 Greg Hogan
@@ -839,12 +851,6 @@ Thanks,
 Committer
 fengwang
   
-  
-https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar" />
-Yangze Guo
-Committer
-guoyangze
-  
   
 https://avatars3.githubusercontent.com/u/9400874?s=50; 
class="committer-avatar" />
 Shaoxuan Wang
diff --git a/content/zh/community.html b/content/zh/community.html
index 5f70ff664..9769d800f 100644
--- a/content/zh/community.html
+++ b/content/zh/community.html
@@ -638,6 +638,18 @@ Thanks,
 PMC, Committer
 gates
   
+  
+https://avatars3.githubusercontent.com/u/19502505?s=50; 
class="committer-avatar" />
+Weijie Guo
+Committer
+guoweijie
+  
+  
+https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar" />
+Yangze Guo
+Committer
+guoyangze
+  
   
 https://avatars1.githubusercontent.com/u/569655?s=50; 
class="committer-avatar" />
 Greg Hogan
@@ -830,12 +842,6 @@ Thanks,
 Committer
 fengwang
   
-  
-https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar" />
-Yangze Guo
-Committer
-guoyangze
-  
   
 https://avatars3.githubusercontent.com/u/9400874?s=50; 
class="committer-avatar" />
 Shaoxuan Wang



[flink-web] branch asf-site updated (7a0c404e3 -> ffaf15279)

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


from 7a0c404e3 [hotfix] Setup staging deployment
 new ff9daa386 Add Weijie Guo to the committer list and fix the incorrect 
order for Yangze Guo.
 new ffaf15279 Rebuild website.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 community.md  | 18 --
 community.zh.md   | 18 --
 content/community.html| 18 --
 content/zh/community.html | 18 --
 4 files changed, 48 insertions(+), 24 deletions(-)



[flink-web] 01/02: Add Weijie Guo to the committer list and fix the incorrect order for Yangze Guo.

2023-02-13 Thread guoweijie
This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit ff9daa38688562444bbd4c86ba757a7ddd235ef9
Author: Weijie Guo 
AuthorDate: Mon Feb 13 12:32:27 2023 +0800

Add Weijie Guo to the committer list and fix the incorrect order for Yangze 
Guo.

This closes #607
---
 community.md| 18 --
 community.zh.md | 18 --
 2 files changed, 24 insertions(+), 12 deletions(-)

diff --git a/community.md b/community.md
index 5915b4ea7..003e6db80 100644
--- a/community.md
+++ b/community.md
@@ -363,6 +363,18 @@ The list below could be outdated. Please find the most 
up-to-date list PMC, Committer
 gates
   
+  
+https://avatars3.githubusercontent.com/u/19502505?s=50; 
class="committer-avatar">
+Weijie Guo
+Committer
+guoweijie
+  
+  
+https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar">
+Yangze Guo
+Committer
+guoyangze
+  
   
 https://avatars1.githubusercontent.com/u/569655?s=50; 
class="committer-avatar">
 Greg Hogan
@@ -555,12 +567,6 @@ The list below could be outdated. Please find the most 
up-to-date list Committer
 fengwang
   
-  
-https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar">
-Yangze Guo
-Committer
-guoyangze
-  
   
 https://avatars3.githubusercontent.com/u/9400874?s=50; 
class="committer-avatar">
 Shaoxuan Wang
diff --git a/community.zh.md b/community.zh.md
index 6b953b941..c07b24fcc 100644
--- a/community.zh.md
+++ b/community.zh.md
@@ -356,6 +356,18 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最
 PMC, Committer
 gates
   
+  
+https://avatars3.githubusercontent.com/u/19502505?s=50; 
class="committer-avatar">
+Weijie Guo
+Committer
+guoweijie
+  
+  
+https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar">
+Yangze Guo
+Committer
+guoyangze
+  
   
 https://avatars1.githubusercontent.com/u/569655?s=50; 
class="committer-avatar">
 Greg Hogan
@@ -548,12 +560,6 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最
 Committer
 fengwang
   
-  
-https://avatars3.githubusercontent.com/u/8684799?s=50; 
class="committer-avatar">
-Yangze Guo
-Committer
-guoyangze
-  
   
 https://avatars3.githubusercontent.com/u/9400874?s=50; 
class="committer-avatar">
 Shaoxuan Wang



[flink] branch release-1.16 updated (26a8fe566b7 -> f0e0069a741)

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from 26a8fe566b7 [FLINK-30944] Fix thread leak in 
ExecutionGraphPartitionReleaseTest.
 add f0e0069a741 [FLINK-31031][python] Disable the output buffer of Python 
process

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/flink/client/python/PythonDriver.java| 2 ++
 .../test/java/org/apache/flink/client/python/PythonDriverTest.java| 4 ++--
 2 files changed, 4 insertions(+), 2 deletions(-)



[flink] branch release-1.17 updated: [FLINK-31031][python] Disable the output buffer of Python process

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
 new 5344b8a59af [FLINK-31031][python] Disable the output buffer of Python 
process
5344b8a59af is described below

commit 5344b8a59afbb17740ee363f22fe79fe0d5d50b2
Author: Dian Fu 
AuthorDate: Mon Feb 13 16:22:56 2023 +0800

[FLINK-31031][python] Disable the output buffer of Python process
---
 .../src/main/java/org/apache/flink/client/python/PythonDriver.java| 2 ++
 .../test/java/org/apache/flink/client/python/PythonDriverTest.java| 4 ++--
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
index 57b1eb2f32d..ed439415418 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -154,6 +154,8 @@ public final class PythonDriver {
  */
 static List constructPythonCommands(final PythonDriverOptions 
pythonDriverOptions) {
 final List commands = new ArrayList<>();
+// disable output buffer
+commands.add("-u");
 if (pythonDriverOptions.getEntryPointScript().isPresent()) {
 commands.add(pythonDriverOptions.getEntryPointScript().get());
 } else {
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
index 6579ce0d9d5..d8b174e7f3b 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
@@ -53,7 +53,7 @@ class PythonDriverTest {
 PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions("xxx", null, args);
 List commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
 // verify the generated commands
-assertThat(commands).containsExactly("-m", "xxx", "--input", "in.txt");
+assertThat(commands).containsExactly("-u", "-m", "xxx", "--input", 
"in.txt");
 }
 
 @Test
@@ -64,6 +64,6 @@ class PythonDriverTest {
 
 PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions(null, "xxx.py", args);
 List commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
-assertThat(commands).containsExactly("xxx.py", "--input", "in.txt");
+assertThat(commands).containsExactly("-u", "xxx.py", "--input", 
"in.txt");
 }
 }



[flink] branch master updated: [FLINK-31031][python] Disable the output buffer of Python process

2023-02-13 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e22f06130a8 [FLINK-31031][python] Disable the output buffer of Python 
process
e22f06130a8 is described below

commit e22f06130a8c663ab454586102c88ba403beee56
Author: Dian Fu 
AuthorDate: Mon Feb 13 16:22:56 2023 +0800

[FLINK-31031][python] Disable the output buffer of Python process
---
 .../src/main/java/org/apache/flink/client/python/PythonDriver.java| 2 ++
 .../test/java/org/apache/flink/client/python/PythonDriverTest.java| 4 ++--
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
index 57b1eb2f32d..ed439415418 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
@@ -154,6 +154,8 @@ public final class PythonDriver {
  */
 static List constructPythonCommands(final PythonDriverOptions 
pythonDriverOptions) {
 final List commands = new ArrayList<>();
+// disable output buffer
+commands.add("-u");
 if (pythonDriverOptions.getEntryPointScript().isPresent()) {
 commands.add(pythonDriverOptions.getEntryPointScript().get());
 } else {
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
index 6579ce0d9d5..d8b174e7f3b 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java
@@ -53,7 +53,7 @@ class PythonDriverTest {
 PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions("xxx", null, args);
 List commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
 // verify the generated commands
-assertThat(commands).containsExactly("-m", "xxx", "--input", "in.txt");
+assertThat(commands).containsExactly("-u", "-m", "xxx", "--input", 
"in.txt");
 }
 
 @Test
@@ -64,6 +64,6 @@ class PythonDriverTest {
 
 PythonDriverOptions pythonDriverOptions = new 
PythonDriverOptions(null, "xxx.py", args);
 List commands = 
PythonDriver.constructPythonCommands(pythonDriverOptions);
-assertThat(commands).containsExactly("xxx.py", "--input", "in.txt");
+assertThat(commands).containsExactly("-u", "xxx.py", "--input", 
"in.txt");
 }
 }



[flink] branch release-1.16 updated (0c0f04769df -> 26a8fe566b7)

2023-02-13 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


from 0c0f04769df [hotfix][doc] Fix typo in elastic_scaling.md
 add 080ddecbb18 [hotfix] Migrate ExecutionGraphPartitionReleaseTest to 
Junit5 and AssertJ.
 add d4242f11bed [hotfix] Introduce Junit5 extension for 
TestingComponentMainThreadExecutor.
 add 26a8fe566b7 [FLINK-30944] Fix thread leak in 
ExecutionGraphPartitionReleaseTest.

No new revisions were added by this update.

Summary of changes:
 .../ExecutionGraphPartitionReleaseTest.java| 70 ++
 .../TestingComponentMainThreadExecutor.java| 39 
 2 files changed, 70 insertions(+), 39 deletions(-)



[flink] 01/03: [hotfix] Migrate ExecutionGraphPartitionReleaseTest to Junit5 and AssertJ.

2023-02-13 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa85ca20ac5919c52a548a069551d04b00885db1
Author: Weijie Guo 
AuthorDate: Wed Feb 8 11:26:26 2023 +0800

[hotfix] Migrate ExecutionGraphPartitionReleaseTest to Junit5 and AssertJ.
---
 .../ExecutionGraphPartitionReleaseTest.java| 56 ++
 1 file changed, 25 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 29711bc9956..5ff3af15b9e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -33,31 +33,27 @@ import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.core.IsEqual.equalTo;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for the interactions of the {@link ExecutionGraph} and {@link
  * PartitionGroupReleaseStrategy}.
  */
-public class ExecutionGraphPartitionReleaseTest extends TestLogger {
+class ExecutionGraphPartitionReleaseTest {
 
-@ClassRule
-public static final TestExecutorResource 
EXECUTOR_RESOURCE =
-TestingUtils.defaultExecutorResource();
+@RegisterExtension
+public static final TestExecutorExtension 
EXECUTOR_EXTENSION =
+TestingUtils.defaultExecutorExtension();
 
 private static final ScheduledExecutorService scheduledExecutorService =
 Executors.newSingleThreadScheduledExecutor();
@@ -67,7 +63,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
 scheduledExecutorService));
 
 @Test
-public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() 
throws Exception {
+void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws 
Exception {
 // setup a simple pipeline of 3 operators with blocking partitions
 final JobVertex sourceVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
 final JobVertex operatorVertex = 
ExecutionGraphTestUtils.createNoOpVertex(1);
@@ -98,7 +94,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
 scheduler.updateTaskExecutionState(
 new TaskExecutionState(
 sourceExecution.getAttemptId(), 
ExecutionState.FINISHED));
-assertThat(releasedPartitions, empty());
+assertThat(releasedPartitions).isEmpty();
 });
 
 mainThreadExecutor.execute(
@@ -110,10 +106,9 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
 scheduler.updateTaskExecutionState(
 new TaskExecutionState(
 operatorExecution.getAttemptId(), 
ExecutionState.FINISHED));
-assertThat(releasedPartitions, hasSize(1));
-assertThat(
-releasedPartitions.remove(),
-equalTo(
+assertThat(releasedPartitions).hasSize(1);
+assertThat(releasedPartitions.remove())
+.isEqualTo(
 new ResultPartitionID(
 sourceExecution
 .getVertex()
@@ -121,7 +116,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
 .keySet()
 .iterator()
 .next(),
-sourceExecution.getAttemptId(;
+

[flink] branch release-1.17 updated (19c05ef0c86 -> a5536f2f2f5)

2023-02-13 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


from 19c05ef0c86 [FLINK-30969][python] Remove the __init__.py files under 
the examples
 new fa85ca20ac5 [hotfix] Migrate ExecutionGraphPartitionReleaseTest to 
Junit5 and AssertJ.
 new 8e8aa2aac16 [hotfix] Introduce Junit5 extension for 
TestingComponentMainThreadExecutor.
 new a5536f2f2f5 [FLINK-30944] Fix thread leak in 
ExecutionGraphPartitionReleaseTest.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ExecutionGraphPartitionReleaseTest.java| 70 ++
 .../TestingComponentMainThreadExecutor.java| 39 
 2 files changed, 70 insertions(+), 39 deletions(-)



[flink] 03/03: [FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest.

2023-02-13 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a5536f2f2f5b0574d8e8ecb768afd049667a1fba
Author: Weijie Guo 
AuthorDate: Wed Feb 8 11:43:35 2023 +0800

[FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest.
---
 .../executiongraph/ExecutionGraphPartitionReleaseTest.java | 14 ++
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 5ff3af15b9e..ca9fbfbb1ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
@@ -40,7 +39,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -55,12 +53,12 @@ class ExecutionGraphPartitionReleaseTest {
 public static final TestExecutorExtension 
EXECUTOR_EXTENSION =
 TestingUtils.defaultExecutorExtension();
 
-private static final ScheduledExecutorService scheduledExecutorService =
-Executors.newSingleThreadScheduledExecutor();
-private static final TestingComponentMainThreadExecutor mainThreadExecutor 
=
-new TestingComponentMainThreadExecutor(
-
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
-scheduledExecutorService));
+@RegisterExtension
+public static final TestingComponentMainThreadExecutor.Extension 
MAIN_THREAD_EXTENSION =
+new TestingComponentMainThreadExecutor.Extension();
+
+private final TestingComponentMainThreadExecutor mainThreadExecutor =
+MAIN_THREAD_EXTENSION.getComponentMainThreadTestExecutor();
 
 @Test
 void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws 
Exception {



[flink] 02/03: [hotfix] Introduce Junit5 extension for TestingComponentMainThreadExecutor.

2023-02-13 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e8aa2aac165620529ef674e7dc5d8e39ddfeae0
Author: Weijie Guo 
AuthorDate: Wed Feb 8 11:42:11 2023 +0800

[hotfix] Introduce Junit5 extension for TestingComponentMainThreadExecutor.
---
 .../TestingComponentMainThreadExecutor.java| 39 ++
 1 file changed, 39 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
index 10d4b17ec99..c767fc14d1b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java
@@ -25,6 +25,9 @@ import org.apache.flink.util.function.FunctionUtils;
 import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.rules.ExternalResource;
 
 import javax.annotation.Nonnull;
@@ -103,4 +106,40 @@ public class TestingComponentMainThreadExecutor {
 return componentMainThreadTestExecutor;
 }
 }
+
+/** Test extension for convenience. */
+public static class Extension implements BeforeAllCallback, 
AfterAllCallback {
+private final long shutdownTimeoutMillis;
+
+private TestingComponentMainThreadExecutor 
componentMainThreadTestExecutor;
+
+private ScheduledExecutorService innerExecutorService;
+
+public Extension() {
+this(500L);
+}
+
+public Extension(long shutdownTimeoutMillis) {
+this.shutdownTimeoutMillis = shutdownTimeoutMillis;
+}
+
+@Override
+public void beforeAll(ExtensionContext extensionContext) throws 
Exception {
+this.innerExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+this.componentMainThreadTestExecutor =
+new TestingComponentMainThreadExecutor(
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+innerExecutorService));
+}
+
+@Override
+public void afterAll(ExtensionContext extensionContext) throws 
Exception {
+ExecutorUtils.gracefulShutdown(
+shutdownTimeoutMillis, TimeUnit.MILLISECONDS, 
innerExecutorService);
+}
+
+public TestingComponentMainThreadExecutor 
getComponentMainThreadTestExecutor() {
+return componentMainThreadTestExecutor;
+}
+}
 }