[flink-table-store] branch master updated: [hotfix] Fix BinaryRowTypeSerializer and add test
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new f8cde15a [hotfix] Fix BinaryRowTypeSerializer and add test f8cde15a is described below commit f8cde15ac1b9c221b85fabef6e25429ee766fb25 Author: JingsongLi AuthorDate: Tue Feb 14 15:41:30 2023 +0800 [hotfix] Fix BinaryRowTypeSerializer and add test --- .../store/connector/BinaryRowTypeSerializer.java | 6 ++- .../connector/BinaryRowTypeSerializerTest.java | 57 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java index 3e63a67e..6a4545c7 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializer.java @@ -120,7 +120,11 @@ public final class BinaryRowTypeSerializer extends TypeSerializer { return new BinaryRowTypeSerializerSnapshot(serializer.getArity()); } -private static class BinaryRowTypeSerializerSnapshot +/** + * {@link TypeSerializerSnapshot} for {@link BinaryRow}. It checks the compatibility of + * numFields without checking type. + */ +public static class BinaryRowTypeSerializerSnapshot implements TypeSerializerSnapshot { private int numFields; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializerTest.java new file mode 100644 index ..8d0c1582 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BinaryRowTypeSerializerTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.store.data.BinaryRow; + +import java.util.Random; + +import static org.apache.flink.table.store.file.io.DataFileTestUtils.row; + +/** Test for {@link BinaryRowTypeSerializer}. */ +public class BinaryRowTypeSerializerTest extends SerializerTestBase { + +@Override +protected TypeSerializer createSerializer() { +return new BinaryRowTypeSerializer(2); +} + +@Override +protected int getLength() { +return -1; +} + +@Override +protected Class getTypeClass() { +return BinaryRow.class; +} + +@Override +protected BinaryRow[] getTestData() { +Random rnd = new Random(); +return new BinaryRow[] { +row(1, 1), +row(2, 2), +row(rnd.nextInt(), rnd.nextInt()), +row(rnd.nextInt(), rnd.nextInt()) +}; +} +}
[flink-table-store] branch master updated: [FLINK-31032] Supports AND push down for orc format
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 39e8b774 [FLINK-31032] Supports AND push down for orc format 39e8b774 is described below commit 39e8b7744713c3219f83de73405a9f2c2d9c391c Author: Shammon FY AuthorDate: Tue Feb 14 14:22:33 2023 +0800 [FLINK-31032] Supports AND push down for orc format This closes #527 --- .../table/store/format/orc/filter/OrcFilters.java | 28 ++ .../orc/filter/OrcPredicateFunctionVisitor.java| 11 - .../store/format/orc/OrcFilterConverterTest.java | 19 +++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java index 236ce65d..ce18540b 100644 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcFilters.java @@ -344,4 +344,32 @@ public class OrcFilters { return "OR(" + Arrays.toString(preds) + ")"; } } + +/** An AND predicate that can be evaluated by the OrcInputFormat. */ +public static class And extends Predicate { +private final Predicate[] preds; + +/** + * Creates an AND predicate. + * + * @param predicates The disjunctive predicates. + */ +public And(Predicate... predicates) { +this.preds = predicates; +} + +@Override +public SearchArgument.Builder add(SearchArgument.Builder builder) { +SearchArgument.Builder withAnd = builder.startAnd(); +for (Predicate pred : preds) { +withAnd = pred.add(withAnd); +} +return withAnd.end(); +} + +@Override +public String toString() { +return "AND(" + Arrays.toString(preds) + ")"; +} +} } diff --git a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java index ccbb3fee..e1edf0b4 100644 --- a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java +++ b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/orc/filter/OrcPredicateFunctionVisitor.java @@ -123,7 +123,16 @@ public class OrcPredicateFunctionVisitor @Override public Optional visitAnd(List> children) { -return Optional.empty(); +if (children.size() != 2) { +throw new RuntimeException("Illegal and children: " + children.size()); +} + +Optional c1 = children.get(0); +if (!c1.isPresent()) { +return Optional.empty(); +} +Optional c2 = children.get(1); +return c2.map(value -> new OrcFilters.And(c1.get(), value)); } @Override diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java index 702847a2..95af1295 100644 --- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java +++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/orc/OrcFilterConverterTest.java @@ -73,6 +73,25 @@ public class OrcFilterConverterTest { new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 1), new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 2)), new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 3))); + +test( +builder.between(0, 1L, 3L), +new OrcFilters.And( +new OrcFilters.Not( +new OrcFilters.LessThan("long1", PredicateLeaf.Type.LONG, 1)), +new OrcFilters.LessThanEquals("long1", PredicateLeaf.Type.LONG, 3))); + +test( +builder.notIn(0, Arrays.asList(1L, 2L, 3L)), +new OrcFilters.And( +new OrcFilters.And( +new OrcFilters.Not( +new OrcFilters.Equals("long1", PredicateLeaf.Type.LONG, 1)), +new OrcFilters.Not( +new OrcFilters.Equals( +
[flink] branch release-1.17 updated (e1c6352d18a -> 58ca4d49c77)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git from e1c6352d18a [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism new c8f4c8f555c [hotfix] Fix incorrect url link in batch_shuffle.md. new 58ca4d49c77 [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/deployment/elastic_scaling.md | 4 ++-- docs/content.zh/docs/ops/batch/batch_shuffle.md| 18 -- docs/content/docs/deployment/elastic_scaling.md| 4 ++-- docs/content/docs/ops/batch/batch_shuffle.md | 18 -- 4 files changed, 36 insertions(+), 8 deletions(-)
[flink] 02/02: [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 58ca4d49c772bcdc73036a5efe0b6b0039ea75e0 Author: Weijie Guo AuthorDate: Wed Feb 8 15:25:40 2023 +0800 [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler. This closes #21890 --- docs/content.zh/docs/deployment/elastic_scaling.md | 4 ++-- docs/content.zh/docs/ops/batch/batch_shuffle.md| 16 +++- docs/content/docs/deployment/elastic_scaling.md| 4 ++-- docs/content/docs/ops/batch/batch_shuffle.md | 16 +++- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md b/docs/content.zh/docs/deployment/elastic_scaling.md index 1e7409fb989..c65c974a35b 100644 --- a/docs/content.zh/docs/deployment/elastic_scaling.md +++ b/docs/content.zh/docs/deployment/elastic_scaling.md @@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调 - 根据数据量自动推导并行度可以更好地适应每天变化的数据量 - SQL作业中的算子也可以分配不同的并行度 -当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 。 +当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 `ALL_EXCHANGES_HYBRID_SELECTIVE`。 ### 自动推导并发度 @@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调 ### 局限性 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。 -- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 的作业。 +- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件. - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。 diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md b/docs/content.zh/docs/ops/batch/batch_shuffle.md index 9ec955ce27c..fda29f75a01 100644 --- a/docs/content.zh/docs/ops/batch/batch_shuffle.md +++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md @@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies: To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). + Data Consumption Constraints + +Hybrid shuffle divides the partition data consumption constraints between producer and consumer into the following three cases: + +- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when all producers are finished. +- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from finished producers. +- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished producers. + +These could be configured via [jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref "docs/deployment/config" >}}#jobmanager-partition-hybrid-partition-data-consume-constraint). + +- **For `AdaptiveBatchScheduler`** : The default constraint is `UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be degraded. +- **If `SpeculativeExecution` is enabled** : The default constraint is `ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with blocking shuffle. Since producers and consumers have the opportunity to run at the same time, more speculative execution tasks may be created, and the cost of failover will also increase. If you want to fall back
[flink] 01/02: [hotfix] Fix incorrect url link in batch_shuffle.md.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit c8f4c8f555cdca20cac49a394a3f7ed3eb1256f1 Author: Weijie Guo AuthorDate: Wed Feb 8 15:27:52 2023 +0800 [hotfix] Fix incorrect url link in batch_shuffle.md. --- docs/content.zh/docs/ops/batch/batch_shuffle.md | 2 +- docs/content/docs/ops/batch/batch_shuffle.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md b/docs/content.zh/docs/ops/batch/batch_shuffle.md index 3db398a0ad3..9ec955ce27c 100644 --- a/docs/content.zh/docs/ops/batch/batch_shuffle.md +++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md @@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies: ### Usage -To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). +To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). ### Limitations diff --git a/docs/content/docs/ops/batch/batch_shuffle.md b/docs/content/docs/ops/batch/batch_shuffle.md index 1ff370339c9..1760ecaf6a7 100644 --- a/docs/content/docs/ops/batch/batch_shuffle.md +++ b/docs/content/docs/ops/batch/batch_shuffle.md @@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies: ### Usage -To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). +To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). ### Limitations
[flink] branch release-1.17 updated (44e6cfb87c1 -> e1c6352d18a)
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a change to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git from 44e6cfb87c1 [FLINK-30962][python] Improve the error message during launching py4j gateway server new c20ebde320b [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable new e1c6352d18a [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../adaptivebatch/AdaptiveBatchScheduler.java | 5 +- ...faultVertexParallelismAndInputInfosDecider.java | 146 +++- .../VertexParallelismAndInputInfosDecider.java | 6 +- .../runtime/scheduler/DefaultSchedulerBuilder.java | 6 +- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 66 - ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++--- .../scheduling/AdaptiveBatchSchedulerITCase.java | 7 +- 7 files changed, 258 insertions(+), 125 deletions(-)
[flink] 01/02: [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit c20ebde320b91ad87f7211ee0fd73416ef2934a6 Author: Lijie Wang AuthorDate: Fri Feb 10 21:42:38 2023 +0800 [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable --- .../apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java index 2dcc33585cd..cab7aa9ba74 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java @@ -84,11 +84,11 @@ class AdaptiveBatchSchedulerITCase { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); -env.setParallelism(4); +env.setParallelism(8); final DataStream source = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) -.setParallelism(4) +.setParallelism(8) .name("source") .slotSharingGroup("group1"); @@ -169,7 +169,8 @@ class AdaptiveBatchSchedulerITCase { configuration.setString(RestOptions.BIND_PORT, "0"); configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L); configuration.setInteger( - BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2); + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, +DEFAULT_MAX_PARALLELISM); configuration.set( BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, MemorySize.parse("150kb"));
[flink] 02/02: [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit e1c6352d18a19403e3eb80736c58b842de21bc88 Author: Lijie Wang AuthorDate: Mon Feb 6 16:46:35 2023 +0800 [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism This closes #21861 --- .../adaptivebatch/AdaptiveBatchScheduler.java | 5 +- ...faultVertexParallelismAndInputInfosDecider.java | 146 +++- .../VertexParallelismAndInputInfosDecider.java | 6 +- .../runtime/scheduler/DefaultSchedulerBuilder.java | 6 +- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 66 - ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++--- 6 files changed, 254 insertions(+), 122 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 160f080c0fc..8abc880b2c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -320,7 +320,10 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { final ParallelismAndInputInfos parallelismAndInputInfos = vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex( -jobVertex.getJobVertexId(), inputs, parallelism); +jobVertex.getJobVertexId(), +inputs, +parallelism, +jobVertex.getMaxParallelism()); if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { log.info( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java index 00a8eb467c5..e7326562852 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java @@ -75,54 +75,110 @@ public class DefaultVertexParallelismAndInputInfosDecider */ private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768; -private final int maxParallelism; -private final int minParallelism; +private final int globalMaxParallelism; +private final int globalMinParallelism; private final long dataVolumePerTask; -private final int defaultSourceParallelism; +private final int globalDefaultSourceParallelism; private DefaultVertexParallelismAndInputInfosDecider( -int maxParallelism, -int minParallelism, +int globalMaxParallelism, +int globalMinParallelism, MemorySize dataVolumePerTask, -int defaultSourceParallelism) { +int globalDefaultSourceParallelism) { -checkArgument(minParallelism > 0, "The minimum parallelism must be larger than 0."); +checkArgument(globalMinParallelism > 0, "The minimum parallelism must be larger than 0."); checkArgument( -maxParallelism >= minParallelism, +globalMaxParallelism >= globalMinParallelism, "Maximum parallelism should be greater than or equal to the minimum parallelism."); checkArgument( -defaultSourceParallelism > 0, +globalDefaultSourceParallelism > 0, "The default source parallelism must be larger than 0."); checkNotNull(dataVolumePerTask); -this.maxParallelism = maxParallelism; -this.minParallelism = minParallelism; +this.globalMaxParallelism = globalMaxParallelism; +this.globalMinParallelism = globalMinParallelism; this.dataVolumePerTask = dataVolumePerTask.getBytes(); -this.defaultSourceParallelism = defaultSourceParallelism; +this.globalDefaultSourceParallelism = globalDefaultSourceParallelism; } @Override public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( JobVertexID jobVertexId, List consumedResults, -int initialParallelism) { +int vertexInitialParallelism, +int vertexMaxParallelism) { checkArgument( -initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT -|| initialParallelism > 0); +
[flink] 01/02: [hotfix] Fix incorrect url link in batch_shuffle.md.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b89f060892c388ca5b985babb1383c11347472cc Author: Weijie Guo AuthorDate: Wed Feb 8 15:27:52 2023 +0800 [hotfix] Fix incorrect url link in batch_shuffle.md. --- docs/content.zh/docs/ops/batch/batch_shuffle.md | 2 +- docs/content/docs/ops/batch/batch_shuffle.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md b/docs/content.zh/docs/ops/batch/batch_shuffle.md index 3db398a0ad3..9ec955ce27c 100644 --- a/docs/content.zh/docs/ops/batch/batch_shuffle.md +++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md @@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies: ### Usage -To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). +To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). ### Limitations diff --git a/docs/content/docs/ops/batch/batch_shuffle.md b/docs/content/docs/ops/batch/batch_shuffle.md index 1ff370339c9..1760ecaf6a7 100644 --- a/docs/content/docs/ops/batch/batch_shuffle.md +++ b/docs/content/docs/ops/batch/batch_shuffle.md @@ -112,7 +112,7 @@ Hybrid shuffle provides two spilling strategies: ### Usage -To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). +To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). ### Limitations
[flink] branch master updated (b3998324b68 -> 5ceda3da20e)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from b3998324b68 [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism new b89f060892c [hotfix] Fix incorrect url link in batch_shuffle.md. new 5ceda3da20e [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/content.zh/docs/deployment/elastic_scaling.md | 4 ++-- docs/content.zh/docs/ops/batch/batch_shuffle.md| 18 -- docs/content/docs/deployment/elastic_scaling.md| 4 ++-- docs/content/docs/ops/batch/batch_shuffle.md | 18 -- 4 files changed, 36 insertions(+), 8 deletions(-)
[flink] 02/02: [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5ceda3da20ec5ed50849d89a629744fe9d4010d8 Author: Weijie Guo AuthorDate: Wed Feb 8 15:25:40 2023 +0800 [FLINK-30860][doc] Add document for hybrid shuffle with adaptive batch scheduler. This closes #21890 --- docs/content.zh/docs/deployment/elastic_scaling.md | 4 ++-- docs/content.zh/docs/ops/batch/batch_shuffle.md| 16 +++- docs/content/docs/deployment/elastic_scaling.md| 4 ++-- docs/content/docs/ops/batch/batch_shuffle.md | 16 +++- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md b/docs/content.zh/docs/deployment/elastic_scaling.md index 1e7409fb989..c65c974a35b 100644 --- a/docs/content.zh/docs/deployment/elastic_scaling.md +++ b/docs/content.zh/docs/deployment/elastic_scaling.md @@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调 - 根据数据量自动推导并行度可以更好地适应每天变化的数据量 - SQL作业中的算子也可以分配不同的并行度 -当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 。 +当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 `ALL_EXCHANGES_HYBRID_SELECTIVE`。 ### 自动推导并发度 @@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调 ### 局限性 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。 -- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 的作业。 +- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件. - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。 diff --git a/docs/content.zh/docs/ops/batch/batch_shuffle.md b/docs/content.zh/docs/ops/batch/batch_shuffle.md index 9ec955ce27c..fda29f75a01 100644 --- a/docs/content.zh/docs/ops/batch/batch_shuffle.md +++ b/docs/content.zh/docs/ops/batch/batch_shuffle.md @@ -114,12 +114,25 @@ Hybrid shuffle provides two spilling strategies: To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy). + Data Consumption Constraints + +Hybrid shuffle divides the partition data consumption constraints between producer and consumer into the following three cases: + +- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when all producers are finished. +- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from finished producers. +- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished producers. + +These could be configured via [jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref "docs/deployment/config" >}}#jobmanager-partition-hybrid-partition-data-consume-constraint). + +- **For `AdaptiveBatchScheduler`** : The default constraint is `UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be degraded. +- **If `SpeculativeExecution` is enabled** : The default constraint is `ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with blocking shuffle. Since producers and consumers have the opportunity to run at the same time, more speculative execution tasks may be created, and the cost of failover will also increase. If you want to fall back to
[flink] 02/02: [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b3998324b685afc779954f7e54cc0d8f281267ec Author: Lijie Wang AuthorDate: Mon Feb 6 16:46:35 2023 +0800 [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism This closes #21861 --- .../adaptivebatch/AdaptiveBatchScheduler.java | 5 +- ...faultVertexParallelismAndInputInfosDecider.java | 146 +++- .../VertexParallelismAndInputInfosDecider.java | 6 +- .../runtime/scheduler/DefaultSchedulerBuilder.java | 6 +- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 66 - ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++--- 6 files changed, 254 insertions(+), 122 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 160f080c0fc..8abc880b2c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -320,7 +320,10 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { final ParallelismAndInputInfos parallelismAndInputInfos = vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex( -jobVertex.getJobVertexId(), inputs, parallelism); +jobVertex.getJobVertexId(), +inputs, +parallelism, +jobVertex.getMaxParallelism()); if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { log.info( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java index 00a8eb467c5..e7326562852 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java @@ -75,54 +75,110 @@ public class DefaultVertexParallelismAndInputInfosDecider */ private static final int MAX_NUM_SUBPARTITIONS_PER_TASK_CONSUME = 32768; -private final int maxParallelism; -private final int minParallelism; +private final int globalMaxParallelism; +private final int globalMinParallelism; private final long dataVolumePerTask; -private final int defaultSourceParallelism; +private final int globalDefaultSourceParallelism; private DefaultVertexParallelismAndInputInfosDecider( -int maxParallelism, -int minParallelism, +int globalMaxParallelism, +int globalMinParallelism, MemorySize dataVolumePerTask, -int defaultSourceParallelism) { +int globalDefaultSourceParallelism) { -checkArgument(minParallelism > 0, "The minimum parallelism must be larger than 0."); +checkArgument(globalMinParallelism > 0, "The minimum parallelism must be larger than 0."); checkArgument( -maxParallelism >= minParallelism, +globalMaxParallelism >= globalMinParallelism, "Maximum parallelism should be greater than or equal to the minimum parallelism."); checkArgument( -defaultSourceParallelism > 0, +globalDefaultSourceParallelism > 0, "The default source parallelism must be larger than 0."); checkNotNull(dataVolumePerTask); -this.maxParallelism = maxParallelism; -this.minParallelism = minParallelism; +this.globalMaxParallelism = globalMaxParallelism; +this.globalMinParallelism = globalMinParallelism; this.dataVolumePerTask = dataVolumePerTask.getBytes(); -this.defaultSourceParallelism = defaultSourceParallelism; +this.globalDefaultSourceParallelism = globalDefaultSourceParallelism; } @Override public ParallelismAndInputInfos decideParallelismAndInputInfosForVertex( JobVertexID jobVertexId, List consumedResults, -int initialParallelism) { +int vertexInitialParallelism, +int vertexMaxParallelism) { checkArgument( -initialParallelism == ExecutionConfig.PARALLELISM_DEFAULT -|| initialParallelism > 0); +
[flink] branch master updated (e67b141f2b2 -> b3998324b68)
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from e67b141f2b2 [FLINK-30961][python] Remove deprecated "tests_require" in setup.py (#21893) new 2fc90245f57 [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable new b3998324b68 [FLINK-30917][runtime] Let adaptive batch scheduler also respect the user-configured max parallelism when deciding parallelism The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../adaptivebatch/AdaptiveBatchScheduler.java | 5 +- ...faultVertexParallelismAndInputInfosDecider.java | 146 +++- .../VertexParallelismAndInputInfosDecider.java | 6 +- .../runtime/scheduler/DefaultSchedulerBuilder.java | 6 +- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 66 - ...tVertexParallelismAndInputInfosDeciderTest.java | 147 ++--- .../scheduling/AdaptiveBatchSchedulerITCase.java | 7 +- 7 files changed, 258 insertions(+), 125 deletions(-)
[flink] 01/02: [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable
This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 2fc90245f570931f75924f4704f5d681ac88ffe4 Author: Lijie Wang AuthorDate: Fri Feb 10 21:42:38 2023 +0800 [hotfix] Adjust the parameters of AdaptiveBatchSchedulerITCase to make tests more reasonable --- .../apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java index 2dcc33585cd..cab7aa9ba74 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java @@ -84,11 +84,11 @@ class AdaptiveBatchSchedulerITCase { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); -env.setParallelism(4); +env.setParallelism(8); final DataStream source = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) -.setParallelism(4) +.setParallelism(8) .name("source") .slotSharingGroup("group1"); @@ -169,7 +169,8 @@ class AdaptiveBatchSchedulerITCase { configuration.setString(RestOptions.BIND_PORT, "0"); configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L); configuration.setInteger( - BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2); + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, +DEFAULT_MAX_PARALLELISM); configuration.set( BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, MemorySize.parse("150kb"));
[flink-table-store] branch master updated: [FLINK-31027] Introduce annotations for table store
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 706a92ff [FLINK-31027] Introduce annotations for table store 706a92ff is described below commit 706a92ffeea7cdf8467d7839aee86132eae50eb5 Author: Shammon FY AuthorDate: Tue Feb 14 11:02:56 2023 +0800 [FLINK-31027] Introduce annotations for table store This closes #526 --- .../utils/BenchmarkGlobalConfiguration.java| 2 -- .../table/store/annotation/Documentation.java | 4 --- .../table/store/annotation/VisibleForTesting.java | 31 +- .../flink/table/store/data/BinaryArrayWriter.java | 2 -- .../flink/table/store/data/BinaryRowWriter.java| 2 -- .../store/data/columnar/ByteColumnVector.java | 3 --- .../store/data/columnar/BytesColumnVector.java | 3 --- .../table/store/data/columnar/ColumnVector.java| 3 --- .../store/data/columnar/DoubleColumnVector.java| 3 --- .../store/data/columnar/FloatColumnVector.java | 3 --- .../table/store/data/columnar/IntColumnVector.java | 3 --- .../store/data/columnar/LongColumnVector.java | 3 --- .../table/store/data/columnar/RowColumnVector.java | 3 --- .../store/data/columnar/ShortColumnVector.java | 3 --- .../data/columnar/heap/AbstractHeapVector.java | 2 -- .../data/columnar/heap/HeapBooleanVector.java | 2 -- .../store/data/columnar/heap/HeapByteVector.java | 2 -- .../store/data/columnar/heap/HeapBytesVector.java | 2 -- .../store/data/columnar/heap/HeapDoubleVector.java | 2 -- .../store/data/columnar/heap/HeapFloatVector.java | 2 -- .../store/data/columnar/heap/HeapIntVector.java| 2 -- .../store/data/columnar/heap/HeapLongVector.java | 2 -- .../store/data/columnar/heap/HeapRowVector.java| 2 -- .../store/data/columnar/heap/HeapShortVector.java | 2 -- .../columnar/writable/AbstractWritableVector.java | 2 -- .../columnar/writable/WritableBooleanVector.java | 2 -- .../data/columnar/writable/WritableByteVector.java | 2 -- .../columnar/writable/WritableBytesVector.java | 2 -- .../columnar/writable/WritableColumnVector.java| 2 -- .../columnar/writable/WritableDoubleVector.java| 2 -- .../columnar/writable/WritableFloatVector.java | 2 -- .../data/columnar/writable/WritableIntVector.java | 2 -- .../data/columnar/writable/WritableLongVector.java | 2 -- .../columnar/writable/WritableShortVector.java | 2 -- .../table/store/file/utils/RecyclableIterator.java | 3 --- .../flink/table/store/fs/hadoop/HadoopFileIO.java | 2 +- .../apache/flink/table/store/io/DataInputView.java | 2 -- .../flink/table/store/io/DataInputViewStream.java | 3 --- .../table/store/io/DataInputViewStreamWrapper.java | 3 --- .../flink/table/store/io/DataOutputViewStream.java | 3 --- .../store/io/DataOutputViewStreamWrapper.java | 3 --- .../table/store/memory/MemorySegmentUtils.java | 2 +- .../store/options/description/Description.java | 3 --- .../table/store/plugin/ComponentClassLoader.java | 2 +- .../apache/flink/table/store/types/DateType.java | 3 --- .../flink/table/store/utils/OperatingSystem.java | 3 --- .../flink/table/store/utils/Preconditions.java | 3 --- .../flink/table/store/utils/ThrowingConsumer.java | 3 --- .../flink/table/store/datagen/DataGenerator.java | 3 --- .../store/datagen/DataGeneratorContainer.java | 2 -- .../table/store/datagen/DataGeneratorMapper.java | 2 -- .../flink/table/store/datagen/RandomGenerator.java | 3 --- .../store/datagen/RandomGeneratorVisitor.java | 2 -- .../flink/table/store/types/DataTypeAssert.java| 2 -- .../store/connector/AbstractTableStoreFactory.java | 2 +- .../flink/table/store/connector/FlinkCatalog.java | 2 +- .../table/store/kafka/KafkaLogSinkProvider.java| 2 +- .../table/store/kafka/KafkaLogSourceProvider.java | 2 +- .../flink/table/store/file/AbstractFileStore.java | 2 +- .../apache/flink/table/store/file/KeyValue.java| 2 +- .../file/append/AppendOnlyCompactManager.java | 2 +- .../table/store/file/catalog/CatalogLock.java | 3 --- .../table/store/file/io/DataFilePathFactory.java | 2 +- .../store/file/io/KeyValueFileWriterFactory.java | 2 +- .../table/store/file/io/RollingFileWriter.java | 2 +- .../table/store/file/manifest/ManifestFile.java| 2 +- .../store/file/mergetree/MergeTreeWriter.java | 2 +- .../file/mergetree/SortBufferWriteBuffer.java | 2 +- .../table/store/file/mergetree/SortedRun.java | 2 +- .../mergetree/compact/MergeTreeCompactManager.java | 2 +- .../mergetree/compact/UniversalCompaction.java | 2 +- .../file/operation/AbstractFileStoreWrite.java | 2 +- .../store/file/operation/FileStoreExpireImpl.java | 2 +-
[flink-connector-pulsar] branch main updated: [FLINK-31048][Connector/Pulsar] Fix incorrect java doc in PulsarSource and PulsarSourceBuilder.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/main by this push: new fb98096 [FLINK-31048][Connector/Pulsar] Fix incorrect java doc in PulsarSource and PulsarSourceBuilder. fb98096 is described below commit fb98096d9a67d26b1965b8ef181125317a59a5c4 Author: Weijie Guo AuthorDate: Tue Feb 14 01:18:06 2023 +0800 [FLINK-31048][Connector/Pulsar] Fix incorrect java doc in PulsarSource and PulsarSourceBuilder. This closes #28 --- .../java/org/apache/flink/connector/pulsar/source/PulsarSource.java | 2 +- .../org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java index c7d272b..c038f32 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java @@ -57,7 +57,7 @@ import org.apache.pulsar.client.api.PulsarClientException; * .setServiceUrl(getServiceUrl()) * .setAdminUrl(getAdminUrl()) * .setSubscriptionName("test") - * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) + * .setDeserializationSchema(new SimpleStringSchema()) * .setBounded(StopCursor::defaultStopCursor) * .build(); * } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 80d8c30..1c8cfbe 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -89,7 +89,7 @@ import static org.apache.flink.util.Preconditions.checkState; * .setAdminUrl(PULSAR_BROKER_HTTP_URL) * .setSubscriptionName("flink-source-1") * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) - * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) + * .setDeserializationSchema(new SimpleStringSchema()) * .build(); * } * @@ -118,7 +118,7 @@ import static org.apache.flink.util.Preconditions.checkState; * .setAdminUrl(PULSAR_BROKER_HTTP_URL) * .setSubscriptionName("flink-source-1") * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) - * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) + * .setDeserializationSchema(new SimpleStringSchema()) * .setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis())) * .build(); * }
[flink] branch master updated: [FLINK-30961][python] Remove deprecated "tests_require" in setup.py (#21893)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e67b141f2b2 [FLINK-30961][python] Remove deprecated "tests_require" in setup.py (#21893) e67b141f2b2 is described below commit e67b141f2b2414cf9b6ddc43e817d4250dd38319 Author: Juntao Hu AuthorDate: Mon Feb 13 18:51:20 2023 -0800 [FLINK-30961][python] Remove deprecated "tests_require" in setup.py (#21893) --- flink-python/dev/dev-requirements.txt | 1 + flink-python/setup.py | 1 - flink-python/tox.ini | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt index ff19b6f1f15..f36142c0fb2 100755 --- a/flink-python/dev/dev-requirements.txt +++ b/flink-python/dev/dev-requirements.txt @@ -31,3 +31,4 @@ grpcio-tools>=1.29.0,<=1.46.3 pemja==0.3.0; platform_system != 'Windows' httplib2>=0.19.0,<=0.20.4 protobuf>=3.19.0,<=3.21 +pytest~=7.0 diff --git a/flink-python/setup.py b/flink-python/setup.py index 9e0affe6c68..893127d232e 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -330,7 +330,6 @@ try: python_requires='>=3.7', install_requires=install_requires, cmdclass={'build_ext': build_ext}, -tests_require=['pytest==4.4.1'], description='Apache Flink Python API', long_description=long_description, long_description_content_type='text/markdown', diff --git a/flink-python/tox.ini b/flink-python/tox.ini index 78f82a65cf6..ac8573a69dc 100644 --- a/flink-python/tox.ini +++ b/flink-python/tox.ini @@ -28,7 +28,6 @@ whitelist_externals= /bin/bash deps = -r dev/dev-requirements.txt -pytest apache-flink-libraries passenv = * commands =
[flink] branch release-1.15 updated: [FLINK-30962][python] Improve the error message during launching py4j gateway server
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new b6cbe5403d8 [FLINK-30962][python] Improve the error message during launching py4j gateway server b6cbe5403d8 is described below commit b6cbe5403d86d42b26b2105d5be9d016384bf736 Author: Juntao Hu AuthorDate: Wed Feb 8 18:05:30 2023 +0800 [FLINK-30962][python] Improve the error message during launching py4j gateway server This closes #21894. --- flink-python/pyflink/java_gateway.py | 6 +- flink-python/pyflink/pyflink_gateway_server.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index fdb47a8c754..963b7454439 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -109,7 +109,11 @@ def launch_gateway(): time.sleep(0.1) if not os.path.isfile(conn_info_file): -raise Exception("Java gateway process exited before sending its port number") +stderr_info = p.stderr.read().decode('utf-8') +raise RuntimeError( +"Java gateway process exited before sending its port number.\nStderr:\n" ++ stderr_info +) with open(conn_info_file, "rb") as info: gateway_port = struct.unpack("!I", info.read(4))[0] diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py index 60301da3843..b685c0922c2 100644 --- a/flink-python/pyflink/pyflink_gateway_server.py +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -323,7 +323,7 @@ def launch_gateway_server_process(env, args): signal.signal(signal.SIGINT, signal.SIG_IGN) preexec_fn = preexec_func return Popen(list(filter(lambda c: len(c) != 0, command)), - stdin=PIPE, preexec_fn=preexec_fn, env=env) + stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env) if __name__ == "__main__":
[flink] branch release-1.16 updated (c0d52c09340 -> c57699cb471)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from c0d52c09340 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose add c57699cb471 [FLINK-30962][python] Improve the error message during launching py4j gateway server No new revisions were added by this update. Summary of changes: flink-python/pyflink/java_gateway.py | 6 +- flink-python/pyflink/pyflink_gateway_server.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-)
[flink] branch release-1.17 updated: [FLINK-30962][python] Improve the error message during launching py4j gateway server
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 44e6cfb87c1 [FLINK-30962][python] Improve the error message during launching py4j gateway server 44e6cfb87c1 is described below commit 44e6cfb87c1b2a5f4106df61cd06c4870d4802f8 Author: Juntao Hu AuthorDate: Wed Feb 8 18:05:30 2023 +0800 [FLINK-30962][python] Improve the error message during launching py4j gateway server This closes #21894. --- flink-python/pyflink/java_gateway.py | 6 +- flink-python/pyflink/pyflink_gateway_server.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index bdcb57e8094..4bd1c3c858e 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -109,7 +109,11 @@ def launch_gateway(): time.sleep(0.1) if not os.path.isfile(conn_info_file): -raise Exception("Java gateway process exited before sending its port number") +stderr_info = p.stderr.read().decode('utf-8') +raise RuntimeError( +"Java gateway process exited before sending its port number.\nStderr:\n" ++ stderr_info +) with open(conn_info_file, "rb") as info: gateway_port = struct.unpack("!I", info.read(4))[0] diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py index f7947d7897c..fdb791120ef 100644 --- a/flink-python/pyflink/pyflink_gateway_server.py +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -261,7 +261,7 @@ def launch_gateway_server_process(env, args): signal.signal(signal.SIGINT, signal.SIG_IGN) preexec_fn = preexec_func return Popen(list(filter(lambda c: len(c) != 0, command)), - stdin=PIPE, preexec_fn=preexec_fn, env=env) + stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env) if __name__ == "__main__":
[flink] branch master updated (3a647d6bec7 -> 50c522f1d81)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git from 3a647d6bec7 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose add 50c522f1d81 [FLINK-30962][python] Improve the error message during launching py4j gateway server No new revisions were added by this update. Summary of changes: flink-python/pyflink/java_gateway.py | 6 +- flink-python/pyflink/pyflink_gateway_server.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-)
[flink] branch release-1.16 updated (0994832be8a -> c0d52c09340)
This is an automated email from the ASF dual-hosted git repository. czweng pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from 0994832be8a [FLINK-30640][sql-client] Fix unstable ctas test in CliClientITCase (#21896) add c0d52c09340 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose No new revisions were added by this update. Summary of changes: .../org/apache/flink/table/api/TableITCase.scala | 37 -- 1 file changed, 21 insertions(+), 16 deletions(-)
[flink] branch release-1.17 updated: [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose
This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new bb66b989188 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose bb66b989188 is described below commit bb66b989188c661d9d7847b6a5728697a3a2dcd2 Author: Matthias Pohl AuthorDate: Tue Feb 14 03:26:19 2023 +0100 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose This closes #21921. --- .../org/apache/flink/table/api/TableITCase.scala | 37 -- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala index e108c98e265..e09c6274aab 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala @@ -24,6 +24,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.catalog.{Column, ResolvedSchema} import org.apache.flink.table.planner.utils.TestTableSourceSinks import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.test.util.AbstractTestBase.MINI_CLUSTER_RESOURCE import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.CollectionUtil @@ -32,7 +33,7 @@ import _root_.java.util import org.hamcrest.MatcherAssert.assertThat import org.hamcrest.Matchers.containsInAnyOrder import org.junit.{Before, Rule, Test} -import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue} +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -108,28 +109,32 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends AbstractTe @Test def testCollectWithClose(): Unit = { -val query = +val sourceDdl = """ -|select id, concat(concat(`first`, ' '), `last`) as `full name` -|from MyTable where mod(id, 2) = 0 - """.stripMargin +|create table unbounded_source ( +| id int +|) with ( +| 'connector' = 'datagen', +| 'number-of-rows' = '1', +| 'rows-per-second' = '1' -- slow producing speed to make sure that +| -- source is not finished when job is cancelled +|) +|""".stripMargin +tEnv.executeSql(sourceDdl) +val query = "select id from unbounded_source where mod(id, 2) = 0" val table = tEnv.sqlQuery(query) val tableResult = table.execute() assertTrue(tableResult.getJobClient.isPresent) assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) val it = tableResult.collect() it.close() -val jobStatus = - try { -Some(tableResult.getJobClient.get().getJobStatus.get()) - } catch { -// ignore the exception, -// because the MiniCluster maybe already been shut down when getting job status -case _: Throwable => None - } -if (jobStatus.isDefined) { - assertNotEquals(JobStatus.RUNNING, jobStatus.get) -} + +// wait for mini cluster to shut down +val jobClient = tableResult.getJobClient.get() +val jobId = jobClient.getJobID +MINI_CLUSTER_RESOURCE.getClusterClient.requestJobResult(jobId).get() + +assertEquals(JobStatus.CANCELED, jobClient.getJobStatus.get()) } @Test
[flink] branch master updated: [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose
This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3a647d6bec7 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose 3a647d6bec7 is described below commit 3a647d6bec7d0413bcbc71668ed45be57bb4bfe6 Author: tsreaper AuthorDate: Tue Feb 14 10:25:47 2023 +0800 [FLINK-25813][table-planner] Fix unstable test TableITCase#testCollectWithClose This closes #18893. --- .../org/apache/flink/table/api/TableITCase.scala | 37 -- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala index e108c98e265..e09c6274aab 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala @@ -24,6 +24,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.catalog.{Column, ResolvedSchema} import org.apache.flink.table.planner.utils.TestTableSourceSinks import org.apache.flink.test.util.AbstractTestBase +import org.apache.flink.test.util.AbstractTestBase.MINI_CLUSTER_RESOURCE import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.CollectionUtil @@ -32,7 +33,7 @@ import _root_.java.util import org.hamcrest.MatcherAssert.assertThat import org.hamcrest.Matchers.containsInAnyOrder import org.junit.{Before, Rule, Test} -import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue} +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -108,28 +109,32 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends AbstractTe @Test def testCollectWithClose(): Unit = { -val query = +val sourceDdl = """ -|select id, concat(concat(`first`, ' '), `last`) as `full name` -|from MyTable where mod(id, 2) = 0 - """.stripMargin +|create table unbounded_source ( +| id int +|) with ( +| 'connector' = 'datagen', +| 'number-of-rows' = '1', +| 'rows-per-second' = '1' -- slow producing speed to make sure that +| -- source is not finished when job is cancelled +|) +|""".stripMargin +tEnv.executeSql(sourceDdl) +val query = "select id from unbounded_source where mod(id, 2) = 0" val table = tEnv.sqlQuery(query) val tableResult = table.execute() assertTrue(tableResult.getJobClient.isPresent) assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) val it = tableResult.collect() it.close() -val jobStatus = - try { -Some(tableResult.getJobClient.get().getJobStatus.get()) - } catch { -// ignore the exception, -// because the MiniCluster maybe already been shut down when getting job status -case _: Throwable => None - } -if (jobStatus.isDefined) { - assertNotEquals(JobStatus.RUNNING, jobStatus.get) -} + +// wait for mini cluster to shut down +val jobClient = tableResult.getJobClient.get() +val jobId = jobClient.getJobID +MINI_CLUSTER_RESOURCE.getClusterClient.requestJobResult(jobId).get() + +assertEquals(JobStatus.CANCELED, jobClient.getJobStatus.get()) } @Test
[flink-table-store] branch master updated: [hotfix] Fix options in FlinkSinkBuilder
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new aba5e166 [hotfix] Fix options in FlinkSinkBuilder aba5e166 is described below commit aba5e166b69d1a8437222988f777fa004268980f Author: Shammon FY AuthorDate: Tue Feb 14 10:20:48 2023 +0800 [hotfix] Fix options in FlinkSinkBuilder This closes #530 --- .../apache/flink/table/store/connector/sink/FlinkSinkBuilder.java | 2 +- .../table/store/connector/sink/FileStoreShuffleBucketTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java index 7d0e570c..7358201d 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java @@ -89,7 +89,7 @@ public class FlinkSinkBuilder { table.schema(), table.options() .toConfiguration() - .getBoolean(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION)); + .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION)); PartitionTransformation partitioned = new PartitionTransformation<>(input.getTransformation(), partitioner); if (parallelism != null) { diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java index c7588063..eeff959b 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java @@ -36,8 +36,8 @@ import org.apache.flink.table.store.table.FileStoreTableFactory; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.types.logical.RowType; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.ArrayList; @@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class FileStoreShuffleBucketTest extends CatalogITCaseBase { private static final int TOTAL_SOURCE_RECORD_COUNT = 1000; -@Before +@BeforeEach public void after() throws Exception { super.before(); CollectStoreSinkWrite.writeRowsMap.clear();
[flink] branch release-1.16 updated (700f8839126 -> 0994832be8a)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from 700f8839126 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry add 0994832be8a [FLINK-30640][sql-client] Fix unstable ctas test in CliClientITCase (#21896) No new revisions were added by this update. Summary of changes: flink-table/flink-sql-client/src/test/resources/sql/set.q | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-)
[flink-table-store] branch master updated: [FLINK-30997] Refactor tests in connector to extends AbstractTestBase
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 74cc7ce1 [FLINK-30997] Refactor tests in connector to extends AbstractTestBase 74cc7ce1 is described below commit 74cc7ce1e516e96349021fa7a4c96f2e992d1b28 Author: Shammon FY AuthorDate: Tue Feb 14 09:30:01 2023 +0800 [FLINK-30997] Refactor tests in connector to extends AbstractTestBase This closes #512 --- .../store/connector/AppendOnlyTableITCase.java | 2 +- .../store/connector/BatchFileStoreITCase.java | 2 +- .../table/store/connector/CatalogITCaseBase.java | 17 +- .../table/store/connector/CatalogTableITCase.java | 7 +- ...AndMultiPartitionedTableWIthKafkaLogITCase.java | 6 +- .../ComputedColumnAndWatermarkTableITCase.java | 6 +- .../store/connector/ContinuousFileStoreITCase.java | 35 ++-- .../table/store/connector/FileStoreITCase.java | 50 ++--- .../store/connector/FileSystemCatalogITCase.java | 16 +- .../store/connector/ForceCompactionITCase.java | 2 +- .../connector/FullCompactionFileStoreITCase.java | 6 +- .../table/store/connector/LargeDataITCase.java | 2 +- .../table/store/connector/LogSystemITCase.java | 8 +- .../table/store/connector/LookupJoinITCase.java| 10 +- .../table/store/connector/MappingTableITCase.java | 10 +- .../table/store/connector/PartialUpdateITCase.java | 2 +- .../store/connector/PreAggregationITCase.java | 2 +- .../table/store/connector/PredicateITCase.java | 2 +- .../table/store/connector/RescaleBucketITCase.java | 28 ++- .../table/store/connector/SchemaChangeITCase.java | 2 +- .../StreamingReadWriteTableWithKafkaLogITCase.java | 10 +- .../store/connector/StreamingWarehouseITCase.java | 2 +- .../store/connector/sink/CompactorSinkITCase.java | 27 +-- .../store/connector/sink/SinkSavepointITCase.java | 53 +++-- .../connector/source/CompactorSourceITCase.java| 10 +- .../store/connector/util/AbstractTestBase.java | 22 +- .../util/MiniClusterWithClientExtension.java | 225 + .../table/store/kafka/KafkaTableTestBase.java | 71 --- 28 files changed, 449 insertions(+), 186 deletions(-) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java index 757934eb..7bc17944 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java @@ -23,7 +23,7 @@ import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java index 7fdd2278..b6ab0591 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/BatchFileStoreITCase.java @@ -21,7 +21,7 @@ package org.apache.flink.table.store.connector; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.types.Row; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java index cc916ddb..6b03d548 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java @@ -30,22 +30,22 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; +import org.apache.flink.table.store.connector.util.AbstractTestBase; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.table.store.fs.Path; import org.apache.flink.table.store.fs.local.LocalFileIO; -import
[flink-table-store] branch master updated: [FLINK-30979] Support shuffling data by partition
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 219c4de5 [FLINK-30979] Support shuffling data by partition 219c4de5 is described below commit 219c4de518c419eade41e99fc419d7f2ccfed213 Author: Shammon FY AuthorDate: Tue Feb 14 09:28:47 2023 +0800 [FLINK-30979] Support shuffling data by partition This closes #522 --- docs/content/docs/how-to/writing-tables.md | 3 + .../generated/flink_connector_configuration.html | 6 + .../store/connector/FlinkConnectorOptions.java | 7 + .../connector/sink/BucketStreamPartitioner.java| 32 +++- .../table/store/connector/sink/FlinkSink.java | 9 +- .../store/connector/sink/FlinkSinkBuilder.java | 23 ++- .../connector/sink/FileStoreShuffleBucketTest.java | 163 + .../table/store/table/sink/PartitionComputer.java | 43 ++ 8 files changed, 273 insertions(+), 13 deletions(-) diff --git a/docs/content/docs/how-to/writing-tables.md b/docs/content/docs/how-to/writing-tables.md index dcdfae3c..14c19c42 100644 --- a/docs/content/docs/how-to/writing-tables.md +++ b/docs/content/docs/how-to/writing-tables.md @@ -96,6 +96,9 @@ Use `INSERT INTO` to apply records and changes to tables. INSERT INTO MyTable SELECT ... ``` +Table Store supports shuffle data by bucket in sink phase. To improve data skew, Table Store also +supports shuffling data by partition fields. You can add option `sink.partition-shuffle` to the table. + {{< /tab >}} {{< tab "Spark3" >}} diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index d5d40b36..6431a767 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -26,5 +26,11 @@ Integer Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. + +sink.partition-shuffle +false +Boolean +The option to enable shuffle data by dynamic partition fields in sink phase for table store. + diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java index 127269e2..504cdb43 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java @@ -62,6 +62,13 @@ public class FlinkConnectorOptions { + "By default, if this option is not defined, the planner will derive the parallelism " + "for each statement individually by also considering the global configuration."); +public static final ConfigOption SINK_SHUFFLE_BY_PARTITION = +ConfigOptions.key("sink.partition-shuffle") +.booleanType() +.defaultValue(false) +.withDescription( +"The option to enable shuffle data by dynamic partition fields in sink phase for table store."); + public static final ConfigOption SCAN_PARALLELISM = ConfigOptions.key("scan.parallelism") .intType() diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java index 7b3d6d29..d8bd0c20 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java @@ -24,32 +24,48 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.connector.FlinkRowWrapper; +import org.apache.flink.table.store.data.InternalRow; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.table.sink.BucketComputer; +import org.apache.flink.table.store.table.sink.PartitionComputer; + +import java.util.Objects; +import java.util.function.Function; /** A {@link StreamPartitioner} to partition
[flink-connector-pulsar] branch main updated: [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception. (#24)
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/main by this push: new 2278653 [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception. (#24) 2278653 is described below commit 2278653d67a8ddf171c88d538a288e503221625a Author: Yufan Sheng AuthorDate: Mon Feb 13 21:46:26 2023 +0800 [FLINK-30109][Connector/Pulsar] Drop the use of sneaky exception. (#24) --- .../pulsar/common/config/PulsarClientFactory.java | 21 ++-- .../pulsar/common/utils/PulsarExceptionUtils.java | 83 .../common/utils/PulsarTransactionUtils.java | 26 ++-- .../flink/connector/pulsar/sink/PulsarSink.java| 5 +- .../pulsar/sink/committer/PulsarCommitter.java | 7 +- .../pulsar/sink/config/SinkConfiguration.java | 2 +- .../connector/pulsar/sink/writer/PulsarWriter.java | 6 +- .../pulsar/sink/writer/topic/MetadataListener.java | 4 +- .../pulsar/sink/writer/topic/ProducerRegister.java | 31 +++-- .../connector/pulsar/source/PulsarSource.java | 7 +- .../pulsar/source/PulsarSourceBuilder.java | 2 +- .../source/enumerator/PulsarSourceEnumerator.java | 53 .../source/enumerator/cursor/StopCursor.java | 2 +- .../cursor/stop/LatestMessageStopCursor.java | 7 +- .../enumerator/subscriber/PulsarSubscriber.java| 15 ++- .../subscriber/impl/BasePulsarSubscriber.java | 50 +--- .../subscriber/impl/TopicListSubscriber.java | 32 ++--- .../subscriber/impl/TopicPatternSubscriber.java| 74 ++- .../source/reader/PulsarPartitionSplitReader.java | 29 +++-- .../source/reader/PulsarSourceFetcherManager.java | 22 ++-- .../pulsar/source/split/PulsarPartitionSplit.java | 7 +- .../pulsar/common/schema/PulsarSchemaTest.java | 1 + .../sink/writer/topic/MetadataListenerTest.java| 6 +- .../sink/writer/topic/ProducerRegisterTest.java| 6 +- .../enumerator/PulsarSourceEnumeratorTest.java | 25 ++-- .../source/enumerator/cursor/StopCursorTest.java | 4 +- .../subscriber/PulsarSubscriberTest.java | 41 +++--- .../reader/PulsarPartitionSplitReaderTest.java | 74 +-- .../source/reader/PulsarSourceReaderTest.java | 2 +- .../PulsarDeserializationSchemaTest.java | 16 +-- .../pulsar/testutils/PulsarTestCommonUtils.java| 6 - .../pulsar/testutils/PulsarTestEnvironment.java| 8 +- .../pulsar/testutils/function/ControlSource.java | 14 +-- .../pulsar/testutils/runtime/PulsarRuntime.java| 4 +- .../testutils/runtime/PulsarRuntimeOperator.java | 140 - .../runtime/container/PulsarContainerRuntime.java | 16 +-- .../testutils/sink/PulsarSinkTestContext.java | 7 +- .../sink/reader/PulsarPartitionDataReader.java | 10 +- .../cases/MultipleTopicsConsumingContext.java | 12 +- .../source/cases/SingleTopicConsumingContext.java | 17 ++- .../writer/KeyedPulsarPartitionDataWriter.java | 15 ++- .../source/writer/PulsarEncryptDataWriter.java | 19 ++- .../source/writer/PulsarPartitionDataWriter.java | 7 +- 43 files changed, 453 insertions(+), 482 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java index 1f01b24..4939a23 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import java.util.Map; @@ -76,7 +77,6 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY; -import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; import static org.apache.pulsar.client.api.SizeUnit.BYTES; /** The factory for creating pulsar client classes from {@link PulsarConfiguration}. */ @@ -88,7 +88,8 @@ public final class PulsarClientFactory { } /** Create a PulsarClient by using the flink
[flink-connector-pulsar] branch main updated: [FLINK-30489][Connector/Pulsar] Shade all the dependencies in flink-sql-connector-pulsar. (#26)
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git The following commit(s) were added to refs/heads/main by this push: new 5f0bb2d [FLINK-30489][Connector/Pulsar] Shade all the dependencies in flink-sql-connector-pulsar. (#26) 5f0bb2d is described below commit 5f0bb2db9f1f357a11cce965eb5832bb908523a1 Author: Yufan Sheng AuthorDate: Mon Feb 13 21:41:52 2023 +0800 [FLINK-30489][Connector/Pulsar] Shade all the dependencies in flink-sql-connector-pulsar. (#26) --- flink-sql-connector-pulsar/pom.xml | 25 + 1 file changed, 25 insertions(+) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index bec9d3d..eec5b3d 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -74,6 +74,31 @@ under the License. org.slf4j:jul-to-slf4j + + + org.apache.pulsar:pulsar-client-all + + META-INF/versions/**/* + io/swagger/**/* + + + + org.bouncycastle:* + + META-INF/versions/**/* + + + + + + org.bouncycastle + org.apache.pulsar.shade.org.bouncycastle + + + com.scurrilous + org.apache.pulsar.shade.com.scurrilous + +
[flink-connector-jdbc] 09/14: [FLINK-30790] Change Postgres tests to use new PostgresDatabase
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 97fa1144e788f25f9f2c321ea8d18bcbd45f41d6 Author: Joao Boto AuthorDate: Mon Jan 30 17:30:33 2023 +0100 [FLINK-30790] Change Postgres tests to use new PostgresDatabase --- .../jdbc/catalog/PostgresCatalogTestBase.java | 26 ++--- .../catalog/factory/JdbcCatalogFactoryTest.java| 26 ++--- .../jdbc/databases/postgres/PostgresDatabase.java | 46 ++- .../postgres/PostgresExactlyOnceSinkE2eTest.java | 65 +- 4 files changed, 57 insertions(+), 106 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java index 2666bbc..bd3b982 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.postgres.PostgresDatabase; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.types.logical.DecimalType; @@ -26,11 +26,6 @@ import org.apache.flink.table.types.logical.DecimalType; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import java.sql.Connection; import java.sql.DriverManager; @@ -38,17 +33,13 @@ import java.sql.SQLException; import java.sql.Statement; /** Test base for {@link PostgresCatalog}. */ -@Testcontainers -class PostgresCatalogTestBase { +class PostgresCatalogTestBase implements PostgresDatabase { public static final Logger LOG = LoggerFactory.getLogger(PostgresCatalogTestBase.class); -protected static final DockerImageName POSTGRES_IMAGE = -DockerImageName.parse(DockerImageVersions.POSTGRES); - protected static final String TEST_CATALOG_NAME = "mypg"; -protected static final String TEST_USERNAME = "postgres"; -protected static final String TEST_PWD = "postgres"; +protected static final String TEST_USERNAME = CONTAINER.getUsername(); +protected static final String TEST_PWD = CONTAINER.getPassword(); protected static final String TEST_DB = "test"; protected static final String TEST_SCHEMA = "test_schema"; protected static final String TABLE1 = "t1"; @@ -64,17 +55,10 @@ class PostgresCatalogTestBase { protected static String baseUrl; protected static PostgresCatalog catalog; -@Container -static final PostgreSQLContainer POSTGRES_CONTAINER = -new PostgreSQLContainer<>(POSTGRES_IMAGE) -.withUsername(TEST_USERNAME) -.withPassword(TEST_PWD) -.withLogConsumer(new Slf4jLogConsumer(LOG)); - @BeforeAll static void init() throws SQLException { // jdbc:postgresql://localhost:50807/postgres?user=postgres -String jdbcUrl = POSTGRES_CONTAINER.getJdbcUrl(); +String jdbcUrl = CONTAINER.getJdbcUrl(); // jdbc:postgresql://localhost:50807/ baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java index e3aaefc..705e494 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java @@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.catalog.factory; import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; import org.apache.flink.connector.jdbc.catalog.PostgresCatalog; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.postgres.PostgresDatabase; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.factories.FactoryUtil; @@ -29,11 +29,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
[flink-connector-jdbc] 06/14: [FLINK-30790] Change H2 and some Derby tests to new implementation
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit d2564a94f2da0a2dd803d87d89822f3bf251c54a Author: Joao Boto AuthorDate: Mon Jan 30 16:52:28 2023 +0100 [FLINK-30790] Change H2 and some Derby tests to new implementation --- .../apache/flink/connector/jdbc/DbMetadata.java| 51 .../flink/connector/jdbc/JdbcDataTestBase.java | 5 +- .../apache/flink/connector/jdbc/JdbcITCase.java| 15 +++--- .../flink/connector/jdbc/JdbcInputFormatTest.java | 10 +--- .../connector/jdbc/JdbcRowOutputFormatTest.java| 3 +- .../apache/flink/connector/jdbc/JdbcTestBase.java | 12 ++--- .../flink/connector/jdbc/JdbcTestFixture.java | 30 .../connector/jdbc/databases/DatabaseMetadata.java | 39 +-- .../jdbc/databases/derby/DerbyDatabase.java| 3 +- .../jdbc/databases/derby/DerbyMetadata.java| 1 - .../connector/jdbc/databases/h2/H2Metadata.java| 3 +- .../h2/xa}/H2XaConnectionWrapper.java | 2 +- .../{xa/h2 => databases/h2/xa}/H2XaDsWrapper.java | 2 +- .../h2/xa}/H2XaResourceWrapper.java| 2 +- .../{xa/h2 => databases/h2/xa}/package-info.java | 2 +- .../jdbc/databases/mysql/MySqlMetadata.java| 2 - .../jdbc/databases/oracle/OracleMetadata.java | 1 - .../jdbc/databases/postgres/PostgresMetadata.java | 1 - .../databases/sqlserver/SqlServerMetadata.java | 1 - .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 4 +- .../oracle/OracleExactlyOnceSinkE2eTest.java | 4 +- .../postgres/PostgresExactlyOnceSinkE2eTest.java | 4 +- .../connector/jdbc/internal/JdbcFullTest.java | 15 +++--- .../jdbc/internal/JdbcTableOutputFormatTest.java | 12 ++--- .../jdbc/table/JdbcAppendOnlyWriterTest.java | 16 +++ .../jdbc/table/JdbcDynamicTableSinkITCase.java | 15 ++ .../connector/jdbc/table/JdbcOutputFormatTest.java | 3 +- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 6 +-- .../connector/jdbc/xa/JdbcXaFacadeImplTest.java| 8 ++-- .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java | 8 ++-- .../flink/connector/jdbc/xa/JdbcXaSinkH2Test.java | 12 ++--- .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 10 ++-- .../jdbc/xa/JdbcXaSinkNoInsertionTest.java | 9 +--- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 16 +++ .../flink/connector/jdbc/xa/h2/H2DbMetadata.java | 56 -- 35 files changed, 130 insertions(+), 253 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java deleted file mode 100644 index 55c5317..000 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc; - -import javax.sql.XADataSource; - -import java.io.Serializable; - -/** Describes a database: driver, schema and urls. */ -public interface DbMetadata extends Serializable { - -default String getInitUrl() { -return getUrl(); -} - -String getUrl(); - -default String getUser() { -return ""; -} - -default String getPassword() { -return ""; -} - -XADataSource buildXaDataSource(); - -String getDriverClass(); - -default JdbcConnectionOptions toConnectionOptions() { -return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() -.withDriverName(getDriverClass()) -.withUrl(getUrl()) -.build(); -} -} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java index 003cf57..8b8a5aa 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java @@ -19,6
[flink-connector-jdbc] 14/14: [FLINK-30790] Cleaning metadata deprecated methods and disableOnMac annotation
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 753ea0e87d05e0eb7b4419cd1653bd36ba986aa5 Author: Joao Boto AuthorDate: Wed Feb 8 12:05:08 2023 +0100 [FLINK-30790] Cleaning metadata deprecated methods and disableOnMac annotation --- .../flink/connector/jdbc/JdbcDataTestBase.java | 4 +-- .../apache/flink/connector/jdbc/JdbcITCase.java| 8 +++--- .../flink/connector/jdbc/JdbcInputFormatTest.java | 30 +++--- .../connector/jdbc/JdbcRowOutputFormatTest.java| 22 .../apache/flink/connector/jdbc/JdbcTestBase.java | 2 +- .../flink/connector/jdbc/JdbcTestFixture.java | 2 +- .../connector/jdbc/databases/DatabaseMetadata.java | 10 +--- .../jdbc/databases/mysql/MySqlMetadata.java| 2 +- .../jdbc/databases/oracle/OracleDatabase.java | 8 ++ .../{OracleDatabase.java => OracleImages.java} | 25 ++ .../jdbc/databases/oracle/OracleMetadata.java | 10 ++-- .../{OracleDatabase.java => OracleXaDatabase.java} | 7 ++--- .../jdbc/databases/postgres/PostgresMetadata.java | 2 +- .../databases/sqlserver/SqlServerMetadata.java | 3 ++- .../oracle/OracleExactlyOnceSinkE2eTest.java | 14 ++ .../jdbc/dialect/oracle/OracleTableSinkITCase.java | 3 --- .../dialect/oracle/OracleTableSourceITCase.java| 3 --- .../sqlserver/SqlServerTableSinkITCase.java| 2 +- .../connector/jdbc/internal/JdbcFullTest.java | 8 +++--- .../jdbc/internal/JdbcTableOutputFormatTest.java | 8 +++--- .../jdbc/table/JdbcAppendOnlyWriterTest.java | 5 ++-- .../jdbc/table/JdbcDynamicTableSinkITCase.java | 2 +- .../connector/jdbc/table/JdbcOutputFormatTest.java | 24 - .../jdbc/table/JdbcRowDataInputFormatTest.java | 24 - .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 7 + 25 files changed, 87 insertions(+), 148 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java index 8b8a5aa..de23d88 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java @@ -35,8 +35,8 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB import static org.mockito.Mockito.doReturn; /** - * Base class for JDBC test using data from {@link JdbcTestFixture}. It uses {@link DerbyDbMetadata} - * and inserts data before each test. + * Base class for JDBC test using data from {@link JdbcTestFixture}. It uses {@link + * org.apache.flink.connector.jdbc.databases.derby.DerbyMetadata} and inserts data before each test. */ public abstract class JdbcDataTestBase extends JdbcTestBase { @BeforeEach diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java index 41a8082..8c521dc 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java @@ -74,7 +74,7 @@ public class JdbcITCase extends JdbcTestBase { String.format(INSERT_TEMPLATE, INPUT_TABLE), TEST_ENTRY_JDBC_STATEMENT_BUILDER, new JdbcConnectionOptionsBuilder() -.withUrl(getMetadata().getUrl()) +.withUrl(getMetadata().getJdbcUrl()) .withDriverName(getMetadata().getDriverClass()) .build())); env.execute(); @@ -108,7 +108,7 @@ public class JdbcITCase extends JdbcTestBase { ps.setString(2, e.content); }, new JdbcConnectionOptionsBuilder() -.withUrl(getMetadata().getUrl()) +.withUrl(getMetadata().getJdbcUrl()) .withDriverName(getMetadata().getDriverClass()) .build())); env.execute(); @@ -118,7 +118,7 @@ public class JdbcITCase extends JdbcTestBase { private List selectWords() throws SQLException { ArrayList strings = new ArrayList<>(); -try (Connection connection = DriverManager.getConnection(getMetadata().getUrl())) { +try (Connection connection = DriverManager.getConnection(getMetadata().getJdbcUrl())) { try (Statement st
[flink-connector-jdbc] 07/14: [FLINK-30790] Change Oracle tests to use new OracleDatabase
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit a4e200fb839b8ad7b82ab7aea7f62415d14d4a4e Author: Joao Boto AuthorDate: Mon Jan 30 17:22:05 2023 +0100 [FLINK-30790] Change Oracle tests to use new OracleDatabase --- .../connector/jdbc/databases/DatabaseMetadata.java | 2 + .../jdbc/databases/derby/DerbyMetadata.java| 5 ++ .../connector/jdbc/databases/h2/H2Metadata.java| 5 ++ .../jdbc/databases/mysql/MySqlMetadata.java| 5 ++ .../jdbc/databases/oracle/OracleDatabase.java | 9 +- .../jdbc/databases/oracle/OracleMetadata.java | 5 ++ .../jdbc/databases/postgres/PostgresMetadata.java | 5 ++ .../databases/sqlserver/SqlServerMetadata.java | 5 ++ .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 5 -- .../jdbc/dialect/oracle/OracleContainer.java | 99 -- .../oracle/OracleExactlyOnceSinkE2eTest.java | 25 ++ .../jdbc/dialect/oracle/OracleTableSinkITCase.java | 66 +-- .../dialect/oracle/OracleTableSourceITCase.java| 33 +--- .../postgres/PostgresExactlyOnceSinkE2eTest.java | 5 -- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 6 +- 15 files changed, 109 insertions(+), 171 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java index 2d3fbec..3eeed1d 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java @@ -39,6 +39,8 @@ public interface DatabaseMetadata extends Serializable { String getJdbcUrl(); +String getJdbcUrlWithCredentials(); + String getUsername(); String getPassword(); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java index 33907db..960a56c 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java @@ -40,6 +40,11 @@ public class DerbyMetadata implements DatabaseMetadata { return String.format("jdbc:derby:%s", dbName); } +@Override +public String getJdbcUrlWithCredentials() { +return getJdbcUrl(); +} + @Override public String getUsername() { return ""; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java index d94e4d6..95db4c6 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java @@ -36,6 +36,11 @@ public class H2Metadata implements DatabaseMetadata { return String.format("jdbc:h2:mem:%s", schema); } +@Override +public String getJdbcUrlWithCredentials() { +return getJdbcUrl(); +} + @Override public String getUsername() { return ""; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java index 9125232..58956ad 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlMetadata.java @@ -52,6 +52,11 @@ public class MySqlMetadata implements DatabaseMetadata { return this.url; } +@Override +public String getJdbcUrlWithCredentials() { +return String.format("%s?user=%s=%s", this.url, this.username, this.password); +} + @Override public String getUsername() { return this.username; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java index d800e38..70e503e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java @@ -19,9 +19,10 @@ package org.apache.flink.connector.jdbc.databases.oracle; import
[flink-connector-jdbc] 11/14: [FLINK-30790] Remove DockerImageVersions
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 12720ae4135048b1300440991fb3b5d10fc5d8f7 Author: Joao Boto AuthorDate: Tue Jan 31 16:45:34 2023 +0100 [FLINK-30790] Remove DockerImageVersions --- .../jdbc/catalog/MySql56CatalogITCase.java | 5 +-- .../jdbc/catalog/MySql57CatalogITCase.java | 5 +-- .../connector/jdbc/catalog/MySqlCatalogITCase.java | 4 +- .../jdbc/databases/mysql/MySqlDatabase.java| 8 ++-- .../jdbc/databases/oracle/OracleDatabase.java | 6 ++- .../jdbc/databases/postgres/PostgresDatabase.java | 8 ++-- .../databases/sqlserver/SqlServerDatabase.java | 11 +- .../sqlserver/SqlServerTableSinkITCase.java| 3 -- .../sqlserver/SqlServerTableSourceITCase.java | 3 -- .../connector/jdbc/test/DockerImageVersions.java | 45 -- 10 files changed, 28 insertions(+), 70 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java index 3a1c554..fb553c6 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; @@ -29,8 +29,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; public class MySql56CatalogITCase extends MySqlCatalogTestBase { @Container -private static final MySQLContainer CONTAINER = -createContainer(DockerImageVersions.MYSQL_5_6); +private static final MySQLContainer CONTAINER = createContainer(MySqlDatabase.MYSQL_5_6); @Override protected String getDatabaseUrl() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java index 350bea8..9d0edcf 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; @@ -29,8 +29,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; public class MySql57CatalogITCase extends MySqlCatalogTestBase { @Container -private static final MySQLContainer CONTAINER = -createContainer(DockerImageVersions.MYSQL_5_7); +private static final MySQLContainer CONTAINER = createContainer(MySqlDatabase.MYSQL_5_7); @Override protected String getDatabaseUrl() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java index 73b5acf..3480fa7 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.jdbc.catalog; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; @@ -29,7 +29,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; public class MySqlCatalogITCase extends MySqlCatalogTestBase { @Container -private static final MySQLContainer CONTAINER = createContainer(DockerImageVersions.MYSQL); +private static final MySQLContainer CONTAINER = createContainer(MySqlDatabase.MYSQL_8_0); @Override protected String getDatabaseUrl() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlDatabase.java index f8e70a9..6e8c69e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/mysql/MySqlDatabase.java
[flink-connector-jdbc] 13/14: [FLINK-30790] Small test fixings
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit cbb1c3538ee6b93e086eca7efffeac73279e98c9 Author: Joao Boto AuthorDate: Tue Feb 7 16:12:25 2023 +0100 [FLINK-30790] Small test fixings --- flink-connector-jdbc/pom.xml | 2 +- .../flink/connector/jdbc/JdbcInputFormatTest.java | 14 --- .../jdbc/catalog/MySqlCatalogTestBase.java | 18 - .../jdbc/databases/derby/DerbyDatabase.java| 2 +- .../connector/jdbc/databases/h2/H2XaDatabase.java | 2 +- .../jdbc/databases/mysql/MySqlDatabase.java| 2 +- .../jdbc/databases/oracle/OracleDatabase.java | 5 +-- .../jdbc/databases/oracle/OracleMetadata.java | 14 +-- .../jdbc/databases/postgres/PostgresDatabase.java | 2 +- .../databases/sqlserver/SqlServerDatabase.java | 2 +- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 12 +- .../connector/jdbc/xa/JdbcXaFacadeTestHelper.java | 44 +- .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java | 8 +++- .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 13 +-- .../jdbc/xa/JdbcXaSinkNoInsertionTest.java | 3 +- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 21 ++- 16 files changed, 66 insertions(+), 98 deletions(-) diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml index fa09d49..5fcdb41 100644 --- a/flink-connector-jdbc/pom.xml +++ b/flink-connector-jdbc/pom.xml @@ -39,7 +39,7 @@ under the License. 2.12.7 3.23.1 42.5.1 - 19.3.0.0 + 21.8.0.0 1.12.10 diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java index ca6baa2..d766f07 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java @@ -29,8 +29,10 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.io.Serializable; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB; import static org.apache.flink.connector.jdbc.JdbcTestFixture.ROW_TYPE_INFO; @@ -180,8 +182,7 @@ class JdbcInputFormatTest extends JdbcDataTestBase { } @Test -void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() -throws SQLException, ClassNotFoundException { +void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException { jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat() .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass()) @@ -191,10 +192,11 @@ class JdbcInputFormatTest extends JdbcDataTestBase { .finish(); jdbcInputFormat.openInputFormat(); -final int defaultFetchSize = - DERBY_EBOOKSHOP_DB.getConnection().createStatement().getFetchSize(); - - assertThat(jdbcInputFormat.getStatement().getFetchSize()).isEqualTo(defaultFetchSize); +try (Connection dbConn = DERBY_EBOOKSHOP_DB.getConnection(); +Statement dbStatement = dbConn.createStatement(); +Statement inputStatement = jdbcInputFormat.getStatement()) { + assertThat(inputStatement.getFetchSize()).isEqualTo(dbStatement.getFetchSize()); +} } @Test diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java index f8b9b07..e46b029 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java @@ -257,7 +257,7 @@ abstract class MySqlCatalogTestBase { } @Test -void testGetDb_DatabaseNotExistException() throws Exception { +void testGetDb_DatabaseNotExistException() { String databaseNotExist = "nonexistent"; assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist)) .satisfies( @@ -275,7 +275,7 @@ abstract class MySqlCatalogTestBase { } @Test -void testDbExists() throws Exception { +void testDbExists() { String databaseNotExist = "nonexistent"; assertThat(catalog.databaseExists(databaseNotExist)).isFalse(); assertThat(catalog.databaseExists(TEST_DB)).isTrue(); @@ -296,7 +296,7 @@ abstract class MySqlCatalogTestBase { }
[flink-connector-jdbc] 08/14: [FLINK-30790] Change SqlServer tests to use new SqlServerDatabase
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 5623cb7cd76e697e5c5c5a271eadb5402622f181 Author: Joao Boto AuthorDate: Mon Jan 30 17:20:47 2023 +0100 [FLINK-30790] Change SqlServer tests to use new SqlServerDatabase --- .../sqlserver/SqlServerTableSinkITCase.java| 96 +++--- .../sqlserver/SqlServerTableSourceITCase.java | 32 2 files changed, 66 insertions(+), 62 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java index b40466b..42daf51 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.dialect.sqlserver; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.databases.sqlserver.SqlServerDatabase; import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.datastream.DataStream; @@ -48,9 +49,8 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.MSSQLServerContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import java.math.BigDecimal; import java.sql.Connection; @@ -69,15 +69,8 @@ import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; /** The Table Sink ITCase for {@link SqlServerDialect}. */ -@Testcontainers -class SqlServerTableSinkITCase extends AbstractTestBase { - -@Container -private static final MSSQLServerContainer container = -new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04") -.acceptLicense(); - -private static String containerUrl; +@DisabledOnOs(OS.MAC) +class SqlServerTableSinkITCase extends AbstractTestBase implements SqlServerDatabase { public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert"; public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend"; @@ -88,14 +81,12 @@ class SqlServerTableSinkITCase extends AbstractTestBase { @BeforeAll static void beforeAll() throws ClassNotFoundException, SQLException { -containerUrl = -String.format( -"%s;username=%s;password=%s", -container.getJdbcUrl(), container.getUsername(), container.getPassword()); -Class.forName(container.getDriverClassName()); +Class.forName(CONTAINER.getDriverClassName()); try (Connection conn = DriverManager.getConnection( -containerUrl, container.getUsername(), container.getPassword()); +CONTAINER.getJdbcUrl(), +CONTAINER.getUsername(), +CONTAINER.getPassword()); Statement stat = conn.createStatement()) { stat.executeUpdate( "CREATE TABLE " @@ -143,10 +134,12 @@ class SqlServerTableSinkITCase extends AbstractTestBase { @AfterAll static void afterAll() throws Exception { TestValuesTableFactory.clearAllData(); -Class.forName(container.getDriverClassName()); +Class.forName(CONTAINER.getDriverClassName()); try (Connection conn = DriverManager.getConnection( -containerUrl, container.getUsername(), container.getPassword()); +CONTAINER.getJdbcUrl(), +CONTAINER.getUsername(), +CONTAINER.getPassword()); Statement stat = conn.createStatement()) { stat.execute("DROP TABLE " + OUTPUT_TABLE1); stat.execute("DROP TABLE " + OUTPUT_TABLE2); @@ -155,7 +148,6 @@ class SqlServerTableSinkITCase extends AbstractTestBase { stat.execute("DROP TABLE " + OUTPUT_TABLE5); stat.execute("DROP TABLE " + USER_TABLE); } -container.stop(); } public static DataStream> get4TupleDataStream( @@ -207,21
[flink-connector-jdbc] 10/14: [FLINK-30790] Change MySql tests to use new MySqlDatabase
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 0bc576ae52b965d3ef4d56b4a4b85459cbf7fa7c Author: Joao Boto AuthorDate: Mon Jan 30 17:41:33 2023 +0100 [FLINK-30790] Change MySql tests to use new MySqlDatabase --- .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 194 + .../jdbc/table/UnsignedTypeConversionITCase.java | 37 +--- 2 files changed, 7 insertions(+), 224 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java index 7100c2b..81d4690 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java @@ -1,47 +1,19 @@ package org.apache.flink.connector.jdbc.dialect.mysql; -import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; -import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata; -import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.SerializableSupplier; import com.mysql.cj.jdbc.MysqlXADataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.MySQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; import javax.sql.XADataSource; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - -import static org.apache.flink.util.Preconditions.checkArgument; - /** * A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. Check for issues with errors on * closing connections. */ -@Testcontainers -public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { - -@Container -private static final MySqlXaContainer CONTAINER = -new MySqlXaContainer(DockerImageVersions.MYSQL) -.withLockWaitTimeout( -(CHECKPOINT_TIMEOUT_MS + TASK_CANCELLATION_TIMEOUT_MS) * 2); - -@Override -public DatabaseMetadata getMetadata() { -return new MySqlMetadata(CONTAINER); -} +public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest +implements MySqlDatabase { @Override public SerializableSupplier getDataSourceSupplier() { @@ -53,164 +25,4 @@ public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { return xaDataSource; }; } - -/** {@link MySQLContainer} with XA enabled. */ -static class MySqlXaContainer extends MySQLContainer { -private long lockWaitTimeout = 0; -private volatile InnoDbStatusLogger innoDbStatusLogger; - -public MySqlXaContainer(String dockerImageName) { -super(DockerImageName.parse(dockerImageName)); -} - -public MySqlXaContainer withLockWaitTimeout(long lockWaitTimeout) { -checkArgument(lockWaitTimeout >= 0, "lockWaitTimeout should be greater than 0"); -this.lockWaitTimeout = lockWaitTimeout; -return this.self(); -} - -@Override -public void start() { -super.start(); -// prevent XAER_RMERR: Fatal error occurred in the transaction branch - check your -// data for consistency works for mysql v8+ -try (Connection connection = -DriverManager.getConnection(getJdbcUrl(), "root", getPassword())) { -prepareDb(connection, lockWaitTimeout); -} catch (SQLException e) { -ExceptionUtils.rethrow(e); -} - -this.innoDbStatusLogger = -new InnoDbStatusLogger( -getJdbcUrl(), "root", getPassword(), lockWaitTimeout / 2); -innoDbStatusLogger.start(); -} - -@Override -public void stop() { -try { -innoDbStatusLogger.stop(); -} catch (Exception e) { -ExceptionUtils.rethrow(e); -} finally { -super.stop(); -} -} - -private void prepareDb(Connection connection, long lockWaitTimeout) throws SQLException { -try (Statement st = connection.createStatement()) { -st.execute("GRANT
[flink-connector-jdbc] 12/14: [FLINK-30790] Fix JdbcExactlyOnceSinkE2eTest loop on failure
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 867fc0f29a1eb140af0201d65cfc57eaf447dad0 Author: Joao Boto AuthorDate: Tue Jan 31 17:52:50 2023 +0100 [FLINK-30790] Fix JdbcExactlyOnceSinkE2eTest loop on failure --- .../apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java | 2 +- .../org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java | 2 +- .../flink/connector/jdbc/databases/oracle/OracleDatabase.java | 5 +++-- .../connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 7 +++ .../jdbc/dialect/oracle/OracleExactlyOnceSinkE2eTest.java | 7 +++ .../jdbc/dialect/postgres/PostgresExactlyOnceSinkE2eTest.java | 7 +++ .../apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 2 +- 7 files changed, 27 insertions(+), 5 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java index 960a56c..087aa58 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/derby/DerbyMetadata.java @@ -23,7 +23,7 @@ import org.apache.derby.jdbc.EmbeddedXADataSource; import javax.sql.XADataSource; -/** DerbyDbMetadata. */ +/** Derby Metadata. */ public class DerbyMetadata implements DatabaseMetadata { private final String dbName; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java index 95db4c6..ee41685 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/h2/H2Metadata.java @@ -22,7 +22,7 @@ import org.apache.flink.connector.jdbc.databases.h2.xa.H2XaDsWrapper; import javax.sql.XADataSource; -/** H2DbMetadata. */ +/** H2 Metadata. */ public class H2Metadata implements DatabaseMetadata { private final String schema; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java index 13269d2..0ba1f0e 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/oracle/OracleDatabase.java @@ -29,14 +29,15 @@ import org.testcontainers.junit.jupiter.Testcontainers; @Testcontainers public interface OracleDatabase extends DatabaseTest { -String ORACLE_18 = "gvenzl/oracle-xe:18.4.0-slim-faststart"; +String ORACLE_18 = "gvenzl/oracle-xe:18.4.0-slim"; String ORACLE_21 = "gvenzl/oracle-xe:21.3.0-slim-faststart"; @Container JdbcDatabaseContainer CONTAINER = new OracleContainer(ORACLE_21) .withStartupTimeoutSeconds(240) -.withConnectTimeoutSeconds(120); +.withConnectTimeoutSeconds(120) +.usingSid(); @Override default DatabaseMetadata getMetadata() { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java index 81d4690..24fa8df 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java @@ -1,6 +1,8 @@ package org.apache.flink.connector.jdbc.dialect.mysql; +import org.apache.flink.connector.jdbc.databases.DatabaseMetadata; import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase; +import org.apache.flink.connector.jdbc.databases.mysql.MySqlMetadata; import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; import org.apache.flink.util.function.SerializableSupplier; @@ -15,6 +17,11 @@ import javax.sql.XADataSource; public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest implements MySqlDatabase { +@Override +public DatabaseMetadata getMetadata() { +return new MySqlMetadata(CONTAINER, true); +} + @Override public SerializableSupplier getDataSourceSupplier() { return () -> { diff --git
[flink-connector-jdbc] branch main updated (ffee715 -> 753ea0e)
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git from ffee715 [FLINK-30963][ci] Streamline binary URL configuration new 45306bb [FLINK-30790] Change log level to off on CI new 0246064 [FLINK-30790] Refactor JdbcDataTypeTest create tests by dialect new ba9dbc9 [FLINK-30790] Refactor JdbcExactlyOnceSinkE2eTest create tests by dialect new 7addee1 [FLINK-30790] Refactor MySqlCatalogITCase create tests by database version new b0a25a6 [FLINK-30790] Create unified databases for testing new d2564a9 [FLINK-30790] Change H2 and some Derby tests to new implementation new a4e200f [FLINK-30790] Change Oracle tests to use new OracleDatabase new 5623cb7 [FLINK-30790] Change SqlServer tests to use new SqlServerDatabase new 97fa114 [FLINK-30790] Change Postgres tests to use new PostgresDatabase new 0bc576a [FLINK-30790] Change MySql tests to use new MySqlDatabase new 12720ae [FLINK-30790] Remove DockerImageVersions new 867fc0f [FLINK-30790] Fix JdbcExactlyOnceSinkE2eTest loop on failure new cbb1c35 [FLINK-30790] Small test fixings new 753ea0e [FLINK-30790] Cleaning metadata deprecated methods and disableOnMac annotation The 14 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../6b9ab1b0-c14d-4667-bab5-407b81fba98b | 12 + flink-connector-jdbc/pom.xml | 2 +- .../flink/connector/jdbc/JdbcDataTestBase.java | 9 +- .../flink/connector/jdbc/JdbcDataTypeTest.java | 230 - .../apache/flink/connector/jdbc/JdbcITCase.java| 15 +- .../flink/connector/jdbc/JdbcInputFormatTest.java | 52 +- .../connector/jdbc/JdbcRowOutputFormatTest.java| 25 +- .../apache/flink/connector/jdbc/JdbcTestBase.java | 12 +- .../flink/connector/jdbc/JdbcTestFixture.java | 31 +- ...ablePathTest.java => MySql56CatalogITCase.java} | 22 +- ...ablePathTest.java => MySql57CatalogITCase.java} | 22 +- .../connector/jdbc/catalog/MySqlCatalogITCase.java | 366 +- .../jdbc/catalog/MySqlCatalogTestBase.java | 396 +-- .../jdbc/catalog/PostgresCatalogTestBase.java | 26 +- .../catalog/factory/JdbcCatalogFactoryTest.java| 26 +- .../DatabaseMetadata.java} | 38 +- .../DatabaseTest.java} | 12 +- .../jdbc/databases/derby/DerbyDatabase.java| 51 ++ .../derby/DerbyMetadata.java} | 37 +- .../h2/H2Metadata.java}| 44 +- .../connector/jdbc/databases/h2/H2XaDatabase.java | 50 ++ .../h2/xa}/H2XaConnectionWrapper.java | 2 +- .../{xa/h2 => databases/h2/xa}/H2XaDsWrapper.java | 4 +- .../h2/xa}/H2XaResourceWrapper.java| 2 +- .../{xa/h2 => databases/h2/xa}/package-info.java | 2 +- .../jdbc/databases/mysql/MySqlDatabase.java| 214 .../jdbc/databases/mysql/MySqlMetadata.java| 92 .../oracle/OracleDatabase.java}| 38 +- .../oracle/OracleImages.java} | 12 +- .../jdbc/databases/oracle/OracleMetadata.java | 97 .../oracle/OracleXaDatabase.java} | 39 +- .../jdbc/databases/postgres/PostgresDatabase.java | 85 .../jdbc/databases/postgres/PostgresMetadata.java | 92 .../databases/sqlserver/SqlServerDatabase.java | 47 ++ .../databases/sqlserver/SqlServerMetadata.java | 93 .../jdbc/dialect/JdbcDialectTypeTest.java | 135 + .../jdbc/dialect/mysql/MySqlDialectTypeTest.java | 65 +++ .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 35 ++ .../jdbc/dialect/oracle/OracleContainer.java | 99 .../jdbc/dialect/oracle/OracleDialectTypeTest.java | 60 +++ .../oracle/OracleExactlyOnceSinkE2eTest.java | 31 ++ .../jdbc/dialect/oracle/OracleTableSinkITCase.java | 63 ++- .../dialect/oracle/OracleTableSourceITCase.java| 30 +- .../dialect/postgres/PostgresDialectTypeTest.java | 65 +++ .../postgres/PostgresExactlyOnceSinkE2eTest.java | 35 ++ .../sqlserver/SqlServerTableSinkITCase.java| 93 ++-- .../sqlserver/SqlServerTableSourceITCase.java | 29 +- .../connector/jdbc/internal/JdbcFullTest.java | 15 +- .../jdbc/internal/JdbcTableOutputFormatTest.java | 12 +- .../jdbc/table/JdbcAppendOnlyWriterTest.java | 17 +- .../jdbc/table/JdbcDynamicTableSinkITCase.java | 15 +- .../connector/jdbc/table/JdbcOutputFormatTest.java | 27 +- .../jdbc/table/JdbcRowDataInputFormatTest.java | 24 +- .../jdbc/table/UnsignedTypeConversionITCase.java | 37 +-
[flink-connector-jdbc] 04/14: [FLINK-30790] Refactor MySqlCatalogITCase create tests by database version
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 7addee18872af6333babbf6314fe073699dbd2f1 Author: Joao Boto AuthorDate: Mon Jan 30 11:38:55 2023 +0100 [FLINK-30790] Refactor MySqlCatalogITCase create tests by database version --- .../6b9ab1b0-c14d-4667-bab5-407b81fba98b | 12 + .../jdbc/catalog/MySql56CatalogITCase.java | 39 ++ .../jdbc/catalog/MySql57CatalogITCase.java | 39 ++ .../connector/jdbc/catalog/MySqlCatalogITCase.java | 366 +-- .../jdbc/catalog/MySqlCatalogTestBase.java | 396 ++--- .../sqlserver/SqlServerTableSinkITCase.java| 3 - .../sqlserver/SqlServerTableSourceITCase.java | 3 - .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 12 +- 8 files changed, 459 insertions(+), 411 deletions(-) diff --git a/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b b/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b index 9af409a..e54d3ff 100644 --- a/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b +++ b/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b @@ -8,6 +8,18 @@ org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase does not satisfy: onl * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule +org.apache.flink.connector.jdbc.catalog.MySql57CatalogITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule +org.apache.flink.connector.jdbc.catalog.MySql56CatalogITCase does not satisfy: only one of the following predicates match:\ +* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ +* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ +* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase does not satisfy: only one of the following predicates match:\ diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java new file mode 100644 index 000..2ff3ed0 --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with
[flink-connector-jdbc] 03/14: [FLINK-30790] Refactor JdbcExactlyOnceSinkE2eTest create tests by dialect
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit ba9dbc91a6dc19a49683dc6b558e46be61fd1c88 Author: Joao Boto AuthorDate: Mon Jan 30 15:01:22 2023 +0100 [FLINK-30790] Refactor JdbcExactlyOnceSinkE2eTest create tests by dialect --- .../apache/flink/connector/jdbc/DbMetadata.java| 4 +- .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 220 + .../jdbc/dialect/mysql/MySqlMetadata.java | 82 .../oracle/OracleExactlyOnceSinkE2eTest.java | 46 ++ .../jdbc/dialect/oracle/OracleMetadata.java| 87 .../postgres/PostgresExactlyOnceSinkE2eTest.java | 91 .../jdbc/dialect/postgres/PostgresMetadata.java| 82 .../sqlserver/SqlServerTableSinkITCase.java| 3 + .../sqlserver/SqlServerTableSourceITCase.java | 3 + .../connector/jdbc/test/DockerImageVersions.java | 19 +- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java| 541 ++--- 11 files changed, 671 insertions(+), 507 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java index 21f93d1..55c5317 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java @@ -24,7 +24,9 @@ import java.io.Serializable; /** Describes a database: driver, schema and urls. */ public interface DbMetadata extends Serializable { -String getInitUrl(); +default String getInitUrl() { +return getUrl(); +} String getUrl(); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java new file mode 100644 index 000..1da2f7c --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlExactlyOnceSinkE2eTest.java @@ -0,0 +1,220 @@ +package org.apache.flink.connector.jdbc.dialect.mysql; + +import org.apache.flink.connector.jdbc.DbMetadata; +import org.apache.flink.connector.jdbc.test.DockerImageVersions; +import org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.SerializableSupplier; + +import com.mysql.cj.jdbc.MysqlXADataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import javax.sql.XADataSource; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A simple end-to-end test for {@link JdbcExactlyOnceSinkE2eTest}. Check for issues with errors on + * closing connections. + */ +@Testcontainers +public class MySqlExactlyOnceSinkE2eTest extends JdbcExactlyOnceSinkE2eTest { + +@Container +private static final MySqlXaContainer CONTAINER = +new MySqlXaContainer(DockerImageVersions.MYSQL) +.withLockWaitTimeout( +(CHECKPOINT_TIMEOUT_MS + TASK_CANCELLATION_TIMEOUT_MS) * 2); + +@Override +protected String getDockerVersion() { +return CONTAINER.getDockerImageName(); +} + +@Override +protected DbMetadata getDbMetadata() { +return new MySqlMetadata(CONTAINER); +} + +@Override +public SerializableSupplier getDataSourceSupplier() { +return () -> { +MysqlXADataSource xaDataSource = new MysqlXADataSource(); +xaDataSource.setUrl(CONTAINER.getJdbcUrl()); +xaDataSource.setUser(CONTAINER.getUsername()); +xaDataSource.setPassword(CONTAINER.getPassword()); +return xaDataSource; +}; +} + +/** {@link MySQLContainer} with XA enabled. */ +static class MySqlXaContainer extends MySQLContainer { +private long lockWaitTimeout = 0; +private volatile InnoDbStatusLogger innoDbStatusLogger; + +public MySqlXaContainer(String dockerImageName) { +super(DockerImageName.parse(dockerImageName)); +} + +public MySqlXaContainer withLockWaitTimeout(long lockWaitTimeout) { +checkArgument(lockWaitTimeout >= 0, "lockWaitTimeout should be greater than 0"); +this.lockWaitTimeout = lockWaitTimeout; +return this.self(); +} + +@Override +public void start() { +
[flink-connector-jdbc] 02/14: [FLINK-30790] Refactor JdbcDataTypeTest create tests by dialect
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 0246064b1262d6e8889dfdc96a1ea1e6a6a640aa Author: Joao Boto AuthorDate: Mon Jan 30 10:30:08 2023 +0100 [FLINK-30790] Refactor JdbcDataTypeTest create tests by dialect --- .../flink/connector/jdbc/JdbcDataTypeTest.java | 230 - .../jdbc/dialect/JdbcDialectTypeTest.java | 135 .../jdbc/dialect/mysql/MySqlDialectTypeTest.java | 65 ++ .../jdbc/dialect/oracle/OracleDialectTypeTest.java | 60 ++ .../dialect/postgres/PostgresDialectTypeTest.java | 65 ++ 5 files changed, 325 insertions(+), 230 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java deleted file mode 100644 index 75be19a..000 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.jdbc; - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import javax.annotation.Nullable; - -import java.util.Arrays; -import java.util.List; - -import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** Tests for all DataTypes and Dialects of JDBC connector. */ -@ExtendWith(ParameterizedTestExtension.class) -public class JdbcDataTypeTest { - -private static final String DDL_FORMAT = -"CREATE TABLE T(\n" -+ "f0 %s\n" -+ ") WITH (\n" -+ " 'connector'='jdbc',\n" -+ " 'url'='" -+ "jdbc:%s:memory:test" -+ "',\n" -+ " 'table-name'='myTable'\n" -+ ")"; - -@Parameters(name = "{0}") -public static List testData() { -return Arrays.asList( -createTestItem("derby", "CHAR"), -createTestItem("derby", "VARCHAR"), -createTestItem("derby", "BOOLEAN"), -createTestItem("derby", "TINYINT"), -createTestItem("derby", "SMALLINT"), -createTestItem("derby", "INTEGER"), -createTestItem("derby", "BIGINT"), -createTestItem("derby", "FLOAT"), -createTestItem("derby", "DOUBLE"), -createTestItem("derby", "DECIMAL(10, 4)"), -createTestItem("derby", "DATE"), -createTestItem("derby", "TIME"), -createTestItem("derby", "TIMESTAMP(3)"), -createTestItem("derby", "TIMESTAMP WITHOUT TIME ZONE"), -createTestItem("derby", "TIMESTAMP(9) WITHOUT TIME ZONE"), -createTestItem("derby", "VARBINARY"), -createTestItem("mysql", "CHAR"), -createTestItem("mysql", "VARCHAR"), -createTestItem("mysql", "BOOLEAN"), -createTestItem("mysql", "TINYINT"), -createTestItem("mysql", "SMALLINT"), -createTestItem("mysql", "INTEGER"), -createTestItem("mysql", "BIGINT"), -createTestItem("mysql", "FLOAT"), -createTestItem("mysql", "DOUBLE"), -createTestItem("mysql", "DECIMAL(10, 4)"), -createTestItem("mysql", "DECIMAL(38, 18)"), -createTestItem("mysql", "DATE"), -
[flink-connector-jdbc] 05/14: [FLINK-30790] Create unified databases for testing
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit b0a25a688679595b0f2d128fb4b88758ce8629da Author: Joao Boto AuthorDate: Thu Feb 2 09:12:37 2023 +0100 [FLINK-30790] Create unified databases for testing --- .../flink/connector/jdbc/JdbcTestFixture.java | 5 +- .../jdbc/catalog/MySql56CatalogITCase.java | 2 +- .../jdbc/catalog/MySql57CatalogITCase.java | 2 +- .../connector/jdbc/databases/DatabaseMetadata.java | 40 ++ .../connector/jdbc/databases/DatabaseTest.java | 24 + .../jdbc/databases/derby/DerbyDatabase.java| 50 + .../derby/DerbyMetadata.java} | 31 +++ .../h2/H2Metadata.java}| 47 .../connector/jdbc/databases/h2/H2XaDatabase.java | 50 + .../mysql/MySqlDatabase.java} | 62 ++ .../mysql/MySqlMetadata.java | 37 +++-- .../jdbc/databases/oracle/OracleDatabase.java | 38 + .../oracle/OracleMetadata.java | 36 +++-- .../jdbc/databases/postgres/PostgresDatabase.java | 43 +++ .../postgres/PostgresMetadata.java | 34 +++- .../databases/sqlserver/SqlServerDatabase.java | 40 ++ .../sqlserver/SqlServerMetadata.java} | 38 +++-- .../dialect/mysql/MySqlExactlyOnceSinkE2eTest.java | 1 + .../oracle/OracleExactlyOnceSinkE2eTest.java | 1 + .../postgres/PostgresExactlyOnceSinkE2eTest.java | 1 + .../flink/connector/jdbc/xa/h2/H2XaDsWrapper.java | 2 +- 21 files changed, 452 insertions(+), 132 deletions(-) diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java index e9b2953..d00fbad 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.jdbc.databases.derby.DerbyMetadata; import org.apache.flink.connector.jdbc.xa.h2.H2DbMetadata; import org.apache.flink.table.types.logical.RowType; @@ -74,8 +75,8 @@ public class JdbcTestFixture { }; private static final String EBOOKSHOP_SCHEMA_NAME = "ebookshop"; -public static final DerbyDbMetadata DERBY_EBOOKSHOP_DB = -new DerbyDbMetadata(EBOOKSHOP_SCHEMA_NAME); +public static final DerbyMetadata DERBY_EBOOKSHOP_DB = +new DerbyMetadata(EBOOKSHOP_SCHEMA_NAME); public static final H2DbMetadata H2_EBOOKSHOP_DB = new H2DbMetadata(EBOOKSHOP_SCHEMA_NAME); /** TestEntry. */ diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java index 2ff3ed0..3a1c554 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql56CatalogITCase.java @@ -24,7 +24,7 @@ import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -/** E2E test for {@link MySqlCatalog}. */ +/** E2E test for {@link MySqlCatalog} with MySql version 5.6. */ @Testcontainers public class MySql56CatalogITCase extends MySqlCatalogTestBase { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java index 0a1dc8b..350bea8 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySql57CatalogITCase.java @@ -24,7 +24,7 @@ import org.testcontainers.containers.MySQLContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -/** E2E test for {@link MySqlCatalog}. */ +/** E2E test for {@link MySqlCatalog} with MySql version 5.7. */ @Testcontainers public class MySql57CatalogITCase extends MySqlCatalogTestBase { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/DatabaseMetadata.java
[flink-connector-jdbc] 01/14: [FLINK-30790] Change log level to off on CI
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git commit 45306bbdeb8f056b6c3cab89e7ee27a573672025 Author: Joao Boto AuthorDate: Sat Jan 28 21:54:38 2023 +0100 [FLINK-30790] Change log level to off on CI --- tools/ci/log4j.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties index 7daf1c3..6bea562 100644 --- a/tools/ci/log4j.properties +++ b/tools/ci/log4j.properties @@ -16,7 +16,9 @@ # limitations under the License. -rootLogger.level = INFO +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF rootLogger.appenderRef.out.ref = ConsoleAppender # -
[flink-table-store] branch master updated: [FLINK-31038] Avoid accessing non-TableStore tables in HiveCatalog.listTables
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new bd6036cc [FLINK-31038] Avoid accessing non-TableStore tables in HiveCatalog.listTables bd6036cc is described below commit bd6036cc1f9cc853e04f45dac7b65b66ffdb669f Author: Jingsong Lee AuthorDate: Mon Feb 13 20:16:40 2023 +0800 [FLINK-31038] Avoid accessing non-TableStore tables in HiveCatalog.listTables This closes #528 --- .../org/apache/flink/table/store/hive/HiveCatalog.java| 15 --- .../apache/flink/table/store/hive/HiveCatalogITCase.java | 13 + 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java index 1b5deb88..90f0f9e2 100644 --- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java @@ -157,9 +157,14 @@ public class HiveCatalog extends AbstractCatalog { try { return client.getAllTables(databaseName).stream() .filter( -tableName -> -tableStoreTableExists( -new Identifier(databaseName, tableName), false)) +tableName -> { +Identifier identifier = new Identifier(databaseName, tableName); +// the environment here may not be able to access non-TableStore +// tables. +// so we just check the schema file first +return schemaFileExists(identifier) +&& tableStoreTableExists(identifier, false); +}) .collect(Collectors.toList()); } catch (UnknownDBException e) { throw new DatabaseNotExistException(databaseName, e); @@ -396,6 +401,10 @@ public class HiveCatalog extends AbstractCatalog { return tableStoreTableExists(identifier, true); } +private boolean schemaFileExists(Identifier identifier) { +return new SchemaManager(fileIO, getTableLocation(identifier)).latest().isPresent(); +} + private boolean tableStoreTableExists(Identifier identifier, boolean throwException) { Table table; try { diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java index f51a081c..bce1a980 100644 --- a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.table.store.connector.FlinkCatalog; import org.apache.flink.table.store.file.catalog.Catalog; import org.apache.flink.table.store.file.catalog.CatalogLock; import org.apache.flink.table.store.file.catalog.Identifier; +import org.apache.flink.table.store.fs.local.LocalFileIO; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; @@ -372,6 +373,18 @@ public class HiveCatalogITCase { String.format("Field names %s cannot contain upper case", "[A, C]")); } +@Test +public void testQuickPathInShowTables() throws Exception { +collect("CREATE TABLE t ( a INT, b STRING )"); +List tables = collect("SHOW TABLES"); +Assert.assertEquals("[+I[t]]", tables.toString()); + +new LocalFileIO() +.delete(new org.apache.flink.table.store.fs.Path(path, "test_db.db/t"), true); +tables = collect("SHOW TABLES"); +Assert.assertEquals("[]", tables.toString()); +} + private List collect(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) {
[flink-table-store] branch master updated (1796d3fa -> a9b33ad5)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git from 1796d3fa [FLINK-31023] Introduce ConfigOption for table store add a9b33ad5 [FLINK-31024] Copy code splitter to table store from flink table No new revisions were added by this update. Summary of changes: flink-table-store-common/pom.xml | 51 ++ .../src/main/antlr4/JavaLexer.g4 | 207 +++ .../src/main/antlr4/JavaParser.g4 | 635 + .../flink/table/store/codegen/GeneratedClass.java | 2 +- .../codesplit/AddBoolBeforeReturnRewriter.java | 183 ++ .../codegen/codesplit/BlockStatementGrouper.java | 476 +++ .../codegen/codesplit/BlockStatementRewriter.java | 251 .../codegen/codesplit/BlockStatementSplitter.java | 276 + .../codesplit/CodeRewriter.java} | 24 +- .../store/codegen/codesplit/CodeSplitUtil.java | 51 ++ .../codegen/codesplit/DeclarationRewriter.java | 269 + .../store/codegen/codesplit/FunctionSplitter.java | 270 + .../store/codegen/codesplit/JavaCodeSplitter.java | 57 ++ .../codegen/codesplit/MemberFieldRewriter.java | 349 +++ .../codegen/codesplit/ReturnAndJumpCounter.java| 38 ++ .../codegen/codesplit/ReturnValueRewriter.java | 194 +++ .../src/main/resources/META-INF/NOTICE | 16 + .../licenses/LICENSE.antlr-java-grammar-files | 26 + .../META-INF/licenses/LICENSE.antlr-runtime| 26 + .../AddBooleanBeforeReturnRewriterTest.java| 74 +++ .../codesplit/BlockStatementGrouperTest.java | 165 ++ .../codesplit/BlockStatementRewriterTest.java | 94 +++ .../codesplit/BlockStatementSplitterTest.java | 77 +++ .../codegen/codesplit/CodeRewriterTestBase.java| 83 +++ .../store/codegen/codesplit/CodeSplitTestUtil.java | 124 .../codegen/codesplit/DeclarationRewriterTest.java | 59 ++ .../codegen/codesplit/FunctionSplitterTest.java| 48 ++ .../codegen/codesplit/JavaCodeSplitterTest.java| 121 .../store/codegen/codesplit/JavaParserTest.java| 66 +++ .../codegen/codesplit/MemberFieldRewriterTest.java | 58 ++ .../codegen/codesplit/ReturnValueRewriterTest.java | 48 ++ .../code/TestAddBooleanBeforeReturn.java | 19 + .../codesplit/add-boolean/code/TestNotRewrite.java | 18 + .../add-boolean/code/TestRewriteInnerClass.java| 32 ++ .../code/TestSkipAnonymousClassAndLambda.java | 27 + .../expected/TestAddBooleanBeforeReturn.java | 21 + .../add-boolean/expected/TestNotRewrite.java | 18 + .../expected/TestRewriteInnerClass.java| 35 ++ .../expected/TestSkipAnonymousClassAndLambda.java | 28 + .../block/code/TestIfInsideWhileLoopRewrite.java | 100 .../TestIfMultipleSingleLineStatementRewrite.java | 22 + .../block/code/TestIfStatementRewrite.java | 44 ++ .../block/code/TestIfStatementRewrite1.java| 34 ++ .../block/code/TestIfStatementRewrite2.java| 32 ++ .../block/code/TestIfStatementRewrite3.java| 21 + ...ewriteIfStatementInFunctionWithReturnValue.java | 23 + .../block/code/TestRewriteInnerClass.java | 23 + .../block/code/TestRewriteTwoStatements.java | 69 +++ .../block/code/TestWhileLoopInsideIfRewrite.java | 46 ++ .../codesplit/block/code/TestWhileLoopRewrite.java | 36 ++ .../expected/TestIfInsideWhileLoopRewrite.java | 134 + .../TestIfMultipleSingleLineStatementRewrite.java | 48 ++ .../block/expected/TestIfStatementRewrite.java | 68 +++ .../block/expected/TestIfStatementRewrite1.java| 80 +++ .../block/expected/TestIfStatementRewrite2.java| 66 +++ .../block/expected/TestIfStatementRewrite3.java| 43 ++ ...ewriteIfStatementInFunctionWithReturnValue.java | 23 + .../block/expected/TestRewriteInnerClass.java | 43 ++ .../block/expected/TestRewriteTwoStatements.java | 129 + .../expected/TestWhileLoopInsideIfRewrite.java | 80 +++ .../block/expected/TestWhileLoopRewrite.java | 62 ++ ...LocalVariableAndMemberVariableWithSameName.java | 12 + .../code/TestLocalVariableWithSameName.java| 15 + ...riteLocalVariableInFunctionWithReturnValue.java | 8 + .../declaration/code/TestRewriteInnerClass.java| 17 + .../declaration/code/TestRewriteLocalVariable.java | 8 + .../code/TestRewriteLocalVariableInForLoop1.java | 9 + .../code/TestRewriteLocalVariableInForLoop2.java | 9 + ...LocalVariableAndMemberVariableWithSameName.java | 16 + .../expected/TestLocalVariableWithSameName.java| 22 + ...riteLocalVariableInFunctionWithReturnValue.java | 0 .../expected/TestRewriteInnerClass.java| 25 + .../expected/TestRewriteLocalVariable.java | 12 + .../TestRewriteLocalVariableInForLoop1.java|
[flink-table-store] branch master updated: [FLINK-31008] Fix the bug that ContinuousFileSplitEnumerator may be out of order when allocating splits
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new e9875b4d [FLINK-31008] Fix the bug that ContinuousFileSplitEnumerator may be out of order when allocating splits e9875b4d is described below commit e9875b4d0bb9d3ccd05b51349ffe7a1ea78510e0 Author: liming30 <6950+limin...@users.noreply.github.com> AuthorDate: Mon Feb 13 19:30:50 2023 +0800 [FLINK-31008] Fix the bug that ContinuousFileSplitEnumerator may be out of order when allocating splits This closes #519 --- .../source/ContinuousFileSplitEnumerator.java | 15 +- .../source/ContinuousFileSplitEnumeratorTest.java | 154 + 2 files changed, 166 insertions(+), 3 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java index 2524c30a..a33fac87 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java @@ -38,7 +38,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import static org.apache.flink.table.store.utils.Preconditions.checkArgument; @@ -52,7 +51,7 @@ public class ContinuousFileSplitEnumerator private final SplitEnumeratorContext context; -private final Map> bucketSplits; +private final Map> bucketSplits; private Long nextSnapshotId; @@ -91,6 +90,16 @@ public class ContinuousFileSplitEnumerator .add(split); } +private void addSplitsBack(Collection splits) { +new LinkedList<>(splits).descendingIterator().forEachRemaining(this::addSplitToHead); +} + +private void addSplitToHead(FileStoreSourceSplit split) { +bucketSplits +.computeIfAbsent(((DataSplit) split.split()).bucket(), i -> new LinkedList<>()) +.addFirst(split); +} + @Override public void start() { context.callAsync( @@ -121,7 +130,7 @@ public class ContinuousFileSplitEnumerator @Override public void addSplitsBack(List splits, int subtaskId) { LOG.debug("File Source Enumerator adds splits back: {}", splits); -addSplits(splits); +addSplitsBack(splits); } @Override diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java new file mode 100644 index ..dfcf0c30 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; +import org.apache.flink.table.store.file.io.DataFileMeta; +import org.apache.flink.table.store.table.source.DataSplit; +import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; + +/** Unit tests
[flink] branch release-1.16 updated (f0e0069a741 -> 700f8839126)
This is an automated email from the ASF dual-hosted git repository. roman pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from f0e0069a741 [FLINK-31031][python] Disable the output buffer of Python process add 700f8839126 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry No new revisions were added by this update. Summary of changes: .../fs/AbstractStateChangeFsUploader.java | 19 +-- .../flink/changelog/fs/FsStateChangelogWriter.java | 8 +- .../flink/changelog/fs/TaskChangelogRegistry.java | 28 ++--- .../changelog/fs/TaskChangelogRegistryImpl.java| 46 --- .../fs/DiscardRecordableStateChangeUploader.java | 73 +++ .../changelog/fs/FsStateChangelogWriterTest.java | 140 + .../fs/TaskChangelogRegistryImplTest.java | 22 ++-- .../fs/TestingBatchingUploadScheduler.java | 41 +++--- .../state/changelog/ChangelogStateDiscardTest.java | 39 +++--- 9 files changed, 322 insertions(+), 94 deletions(-) create mode 100644 flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java copy flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestManualCacheReloadTrigger.java => flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingBatchingUploadScheduler.java (53%)
[flink] branch release-1.17 updated: [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new c612575ed33 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry c612575ed33 is described below commit c612575ed339b319d9822dd7cbc59e3d972fe5ed Author: wangfeifan AuthorDate: Wed Feb 1 00:04:50 2023 +0800 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com> --- .../fs/AbstractStateChangeFsUploader.java | 19 +-- .../flink/changelog/fs/FsStateChangelogWriter.java | 8 +- .../flink/changelog/fs/TaskChangelogRegistry.java | 28 ++--- .../changelog/fs/TaskChangelogRegistryImpl.java| 46 --- .../fs/DiscardRecordableStateChangeUploader.java | 73 +++ .../changelog/fs/FsStateChangelogWriterTest.java | 140 + .../fs/TaskChangelogRegistryImplTest.java | 22 ++-- .../fs/TestingBatchingUploadScheduler.java | 52 .../state/changelog/ChangelogStateDiscardTest.java | 39 +++--- 9 files changed, 352 insertions(+), 75 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java index 3dd3dfcc70f..55bdfeac179 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.function.BiFunction; -import java.util.stream.Collectors; /** Base implementation of StateChangeUploader. */ public abstract class AbstractStateChangeFsUploader implements StateChangeUploader { @@ -80,22 +79,16 @@ public abstract class AbstractStateChangeFsUploader implements StateChangeUpload for (UploadTask task : tasks) { tasksOffsets.put(task, format.write(stream, task.changeSets)); } + +long numOfChangeSets = tasks.stream().flatMap(t -> t.getChangeSets().stream()).count(); + StreamStateHandle handle = stream.getHandle(handleFactory); -changelogRegistry.startTracking( -handle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(handle, numOfChangeSets); + if (stream instanceof DuplicatingOutputStreamWithPos) { StreamStateHandle localHandle = ((DuplicatingOutputStreamWithPos) stream).getSecondaryHandle(handleFactory); -changelogRegistry.startTracking( -localHandle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(localHandle, numOfChangeSets); return new UploadTasksResult(tasksOffsets, handle, localHandle); } // WARN: streams have to be closed before returning the results diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index f72c50a9d88..83a4b2d5861 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -288,9 +288,9 @@ class FsStateChangelogWriter implements StateChangelogWriter notUsedState) { LOG.trace("Uploaded state to discard: {}", notUsedState); for (UploadResult result : notUsedState.values()) { -changelogRegistry.notUsed(result.streamStateHandle, logId); +changelogRegistry.release(result.streamStateHandle); if (result.localStreamHandle != null) { -changelogRegistry.notUsed(result.localStreamHandle, logId); +changelogRegistry.release(result.localStreamHandle); } } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/TaskChangelogRegistry.java
[flink] branch master updated: [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c0aa73df4df [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry c0aa73df4df is described below commit c0aa73df4df4e39c138f2cddaeb8efad6c831d03 Author: wangfeifan AuthorDate: Wed Feb 1 00:04:50 2023 +0800 [FLINK-28440][state/changelog] record reference count of StreamStateHandle in TaskChangelogRegistry Co-authored-by: Yanfei Lei <18653940+fre...@users.noreply.github.com> --- .../fs/AbstractStateChangeFsUploader.java | 19 +-- .../flink/changelog/fs/FsStateChangelogWriter.java | 8 +- .../flink/changelog/fs/TaskChangelogRegistry.java | 28 ++--- .../changelog/fs/TaskChangelogRegistryImpl.java| 46 --- .../fs/DiscardRecordableStateChangeUploader.java | 73 +++ .../changelog/fs/FsStateChangelogWriterTest.java | 140 + .../fs/TaskChangelogRegistryImplTest.java | 22 ++-- .../fs/TestingBatchingUploadScheduler.java | 52 .../state/changelog/ChangelogStateDiscardTest.java | 39 +++--- 9 files changed, 352 insertions(+), 75 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java index 3dd3dfcc70f..55bdfeac179 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java @@ -33,7 +33,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.function.BiFunction; -import java.util.stream.Collectors; /** Base implementation of StateChangeUploader. */ public abstract class AbstractStateChangeFsUploader implements StateChangeUploader { @@ -80,22 +79,16 @@ public abstract class AbstractStateChangeFsUploader implements StateChangeUpload for (UploadTask task : tasks) { tasksOffsets.put(task, format.write(stream, task.changeSets)); } + +long numOfChangeSets = tasks.stream().flatMap(t -> t.getChangeSets().stream()).count(); + StreamStateHandle handle = stream.getHandle(handleFactory); -changelogRegistry.startTracking( -handle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(handle, numOfChangeSets); + if (stream instanceof DuplicatingOutputStreamWithPos) { StreamStateHandle localHandle = ((DuplicatingOutputStreamWithPos) stream).getSecondaryHandle(handleFactory); -changelogRegistry.startTracking( -localHandle, -tasks.stream() -.flatMap(t -> t.getChangeSets().stream()) -.map(StateChangeSet::getLogId) -.collect(Collectors.toSet())); +changelogRegistry.startTracking(localHandle, numOfChangeSets); return new UploadTasksResult(tasksOffsets, handle, localHandle); } // WARN: streams have to be closed before returning the results diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index f72c50a9d88..83a4b2d5861 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -288,9 +288,9 @@ class FsStateChangelogWriter implements StateChangelogWriter notUsedState) { LOG.trace("Uploaded state to discard: {}", notUsedState); for (UploadResult result : notUsedState.values()) { -changelogRegistry.notUsed(result.streamStateHandle, logId); +changelogRegistry.release(result.streamStateHandle); if (result.localStreamHandle != null) { -changelogRegistry.notUsed(result.localStreamHandle, logId); +changelogRegistry.release(result.localStreamHandle); } } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/TaskChangelogRegistry.java
[flink] branch release-1.15 updated: [FLINK-31031][python] Disable the output buffer of Python process
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.15 by this push: new 2fa360ae387 [FLINK-31031][python] Disable the output buffer of Python process 2fa360ae387 is described below commit 2fa360ae387215455b536633808d6a82ca1e17ca Author: Dian Fu AuthorDate: Mon Feb 13 16:22:56 2023 +0800 [FLINK-31031][python] Disable the output buffer of Python process --- .../org/apache/flink/client/python/PythonDriver.java | 2 ++ .../apache/flink/client/python/PythonDriverTest.java | 20 +++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java index 57b1eb2f32d..ed439415418 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java @@ -154,6 +154,8 @@ public final class PythonDriver { */ static List constructPythonCommands(final PythonDriverOptions pythonDriverOptions) { final List commands = new ArrayList<>(); +// disable output buffer +commands.add("-u"); if (pythonDriverOptions.getEntryPointScript().isPresent()) { commands.add(pythonDriverOptions.getEntryPointScript().get()); } else { diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java index 601c68fe16b..1b14bd8b379 100644 --- a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java +++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java @@ -52,11 +52,12 @@ public class PythonDriverTest { PythonDriverOptions pythonDriverOptions = new PythonDriverOptions("xxx", null, args); List commands = PythonDriver.constructPythonCommands(pythonDriverOptions); // verify the generated commands -Assert.assertEquals(4, commands.size()); -Assert.assertEquals(commands.get(0), "-m"); -Assert.assertEquals(commands.get(1), "xxx"); -Assert.assertEquals(commands.get(2), "--input"); -Assert.assertEquals(commands.get(3), "in.txt"); +Assert.assertEquals(5, commands.size()); +Assert.assertEquals(commands.get(0), "-u"); +Assert.assertEquals(commands.get(1), "-m"); +Assert.assertEquals(commands.get(2), "xxx"); +Assert.assertEquals(commands.get(3), "--input"); +Assert.assertEquals(commands.get(4), "in.txt"); } @Test @@ -67,9 +68,10 @@ public class PythonDriverTest { PythonDriverOptions pythonDriverOptions = new PythonDriverOptions(null, "xxx.py", args); List commands = PythonDriver.constructPythonCommands(pythonDriverOptions); -Assert.assertEquals(3, commands.size()); -Assert.assertEquals(commands.get(0), "xxx.py"); -Assert.assertEquals(commands.get(1), "--input"); -Assert.assertEquals(commands.get(2), "in.txt"); +Assert.assertEquals(4, commands.size()); +Assert.assertEquals(commands.get(0), "-u"); +Assert.assertEquals(commands.get(1), "xxx.py"); +Assert.assertEquals(commands.get(2), "--input"); +Assert.assertEquals(commands.get(3), "in.txt"); } }
[flink-web] 02/02: Rebuild website.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit ffaf15279d4d1e73e439248e1338623decaaa762 Author: Weijie Guo AuthorDate: Mon Feb 13 16:36:17 2023 +0800 Rebuild website. --- content/community.html| 18 -- content/zh/community.html | 18 -- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/content/community.html b/content/community.html index bacb8a38a..aac814165 100644 --- a/content/community.html +++ b/content/community.html @@ -647,6 +647,18 @@ Thanks, PMC, Committer gates + +https://avatars3.githubusercontent.com/u/19502505?s=50; class="committer-avatar" /> +Weijie Guo +Committer +guoweijie + + +https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar" /> +Yangze Guo +Committer +guoyangze + https://avatars1.githubusercontent.com/u/569655?s=50; class="committer-avatar" /> Greg Hogan @@ -839,12 +851,6 @@ Thanks, Committer fengwang - -https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar" /> -Yangze Guo -Committer -guoyangze - https://avatars3.githubusercontent.com/u/9400874?s=50; class="committer-avatar" /> Shaoxuan Wang diff --git a/content/zh/community.html b/content/zh/community.html index 5f70ff664..9769d800f 100644 --- a/content/zh/community.html +++ b/content/zh/community.html @@ -638,6 +638,18 @@ Thanks, PMC, Committer gates + +https://avatars3.githubusercontent.com/u/19502505?s=50; class="committer-avatar" /> +Weijie Guo +Committer +guoweijie + + +https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar" /> +Yangze Guo +Committer +guoyangze + https://avatars1.githubusercontent.com/u/569655?s=50; class="committer-avatar" /> Greg Hogan @@ -830,12 +842,6 @@ Thanks, Committer fengwang - -https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar" /> -Yangze Guo -Committer -guoyangze - https://avatars3.githubusercontent.com/u/9400874?s=50; class="committer-avatar" /> Shaoxuan Wang
[flink-web] branch asf-site updated (7a0c404e3 -> ffaf15279)
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git from 7a0c404e3 [hotfix] Setup staging deployment new ff9daa386 Add Weijie Guo to the committer list and fix the incorrect order for Yangze Guo. new ffaf15279 Rebuild website. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: community.md | 18 -- community.zh.md | 18 -- content/community.html| 18 -- content/zh/community.html | 18 -- 4 files changed, 48 insertions(+), 24 deletions(-)
[flink-web] 01/02: Add Weijie Guo to the committer list and fix the incorrect order for Yangze Guo.
This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit ff9daa38688562444bbd4c86ba757a7ddd235ef9 Author: Weijie Guo AuthorDate: Mon Feb 13 12:32:27 2023 +0800 Add Weijie Guo to the committer list and fix the incorrect order for Yangze Guo. This closes #607 --- community.md| 18 -- community.zh.md | 18 -- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/community.md b/community.md index 5915b4ea7..003e6db80 100644 --- a/community.md +++ b/community.md @@ -363,6 +363,18 @@ The list below could be outdated. Please find the most up-to-date list PMC, Committer gates + +https://avatars3.githubusercontent.com/u/19502505?s=50; class="committer-avatar"> +Weijie Guo +Committer +guoweijie + + +https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar"> +Yangze Guo +Committer +guoyangze + https://avatars1.githubusercontent.com/u/569655?s=50; class="committer-avatar"> Greg Hogan @@ -555,12 +567,6 @@ The list below could be outdated. Please find the most up-to-date list Committer fengwang - -https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar"> -Yangze Guo -Committer -guoyangze - https://avatars3.githubusercontent.com/u/9400874?s=50; class="committer-avatar"> Shaoxuan Wang diff --git a/community.zh.md b/community.zh.md index 6b953b941..c07b24fcc 100644 --- a/community.zh.md +++ b/community.zh.md @@ -356,6 +356,18 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最 PMC, Committer gates + +https://avatars3.githubusercontent.com/u/19502505?s=50; class="committer-avatar"> +Weijie Guo +Committer +guoweijie + + +https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar"> +Yangze Guo +Committer +guoyangze + https://avatars1.githubusercontent.com/u/569655?s=50; class="committer-avatar"> Greg Hogan @@ -548,12 +560,6 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最 Committer fengwang - -https://avatars3.githubusercontent.com/u/8684799?s=50; class="committer-avatar"> -Yangze Guo -Committer -guoyangze - https://avatars3.githubusercontent.com/u/9400874?s=50; class="committer-avatar"> Shaoxuan Wang
[flink] branch release-1.16 updated (26a8fe566b7 -> f0e0069a741)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from 26a8fe566b7 [FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest. add f0e0069a741 [FLINK-31031][python] Disable the output buffer of Python process No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/client/python/PythonDriver.java| 2 ++ .../test/java/org/apache/flink/client/python/PythonDriverTest.java| 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-)
[flink] branch release-1.17 updated: [FLINK-31031][python] Disable the output buffer of Python process
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.17 by this push: new 5344b8a59af [FLINK-31031][python] Disable the output buffer of Python process 5344b8a59af is described below commit 5344b8a59afbb17740ee363f22fe79fe0d5d50b2 Author: Dian Fu AuthorDate: Mon Feb 13 16:22:56 2023 +0800 [FLINK-31031][python] Disable the output buffer of Python process --- .../src/main/java/org/apache/flink/client/python/PythonDriver.java| 2 ++ .../test/java/org/apache/flink/client/python/PythonDriverTest.java| 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java index 57b1eb2f32d..ed439415418 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java @@ -154,6 +154,8 @@ public final class PythonDriver { */ static List constructPythonCommands(final PythonDriverOptions pythonDriverOptions) { final List commands = new ArrayList<>(); +// disable output buffer +commands.add("-u"); if (pythonDriverOptions.getEntryPointScript().isPresent()) { commands.add(pythonDriverOptions.getEntryPointScript().get()); } else { diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java index 6579ce0d9d5..d8b174e7f3b 100644 --- a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java +++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java @@ -53,7 +53,7 @@ class PythonDriverTest { PythonDriverOptions pythonDriverOptions = new PythonDriverOptions("xxx", null, args); List commands = PythonDriver.constructPythonCommands(pythonDriverOptions); // verify the generated commands -assertThat(commands).containsExactly("-m", "xxx", "--input", "in.txt"); +assertThat(commands).containsExactly("-u", "-m", "xxx", "--input", "in.txt"); } @Test @@ -64,6 +64,6 @@ class PythonDriverTest { PythonDriverOptions pythonDriverOptions = new PythonDriverOptions(null, "xxx.py", args); List commands = PythonDriver.constructPythonCommands(pythonDriverOptions); -assertThat(commands).containsExactly("xxx.py", "--input", "in.txt"); +assertThat(commands).containsExactly("-u", "xxx.py", "--input", "in.txt"); } }
[flink] branch master updated: [FLINK-31031][python] Disable the output buffer of Python process
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e22f06130a8 [FLINK-31031][python] Disable the output buffer of Python process e22f06130a8 is described below commit e22f06130a8c663ab454586102c88ba403beee56 Author: Dian Fu AuthorDate: Mon Feb 13 16:22:56 2023 +0800 [FLINK-31031][python] Disable the output buffer of Python process --- .../src/main/java/org/apache/flink/client/python/PythonDriver.java| 2 ++ .../test/java/org/apache/flink/client/python/PythonDriverTest.java| 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java index 57b1eb2f32d..ed439415418 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java @@ -154,6 +154,8 @@ public final class PythonDriver { */ static List constructPythonCommands(final PythonDriverOptions pythonDriverOptions) { final List commands = new ArrayList<>(); +// disable output buffer +commands.add("-u"); if (pythonDriverOptions.getEntryPointScript().isPresent()) { commands.add(pythonDriverOptions.getEntryPointScript().get()); } else { diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java index 6579ce0d9d5..d8b174e7f3b 100644 --- a/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java +++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java @@ -53,7 +53,7 @@ class PythonDriverTest { PythonDriverOptions pythonDriverOptions = new PythonDriverOptions("xxx", null, args); List commands = PythonDriver.constructPythonCommands(pythonDriverOptions); // verify the generated commands -assertThat(commands).containsExactly("-m", "xxx", "--input", "in.txt"); +assertThat(commands).containsExactly("-u", "-m", "xxx", "--input", "in.txt"); } @Test @@ -64,6 +64,6 @@ class PythonDriverTest { PythonDriverOptions pythonDriverOptions = new PythonDriverOptions(null, "xxx.py", args); List commands = PythonDriver.constructPythonCommands(pythonDriverOptions); -assertThat(commands).containsExactly("xxx.py", "--input", "in.txt"); +assertThat(commands).containsExactly("-u", "xxx.py", "--input", "in.txt"); } }
[flink] branch release-1.16 updated (0c0f04769df -> 26a8fe566b7)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git from 0c0f04769df [hotfix][doc] Fix typo in elastic_scaling.md add 080ddecbb18 [hotfix] Migrate ExecutionGraphPartitionReleaseTest to Junit5 and AssertJ. add d4242f11bed [hotfix] Introduce Junit5 extension for TestingComponentMainThreadExecutor. add 26a8fe566b7 [FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest. No new revisions were added by this update. Summary of changes: .../ExecutionGraphPartitionReleaseTest.java| 70 ++ .../TestingComponentMainThreadExecutor.java| 39 2 files changed, 70 insertions(+), 39 deletions(-)
[flink] 01/03: [hotfix] Migrate ExecutionGraphPartitionReleaseTest to Junit5 and AssertJ.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit fa85ca20ac5919c52a548a069551d04b00885db1 Author: Weijie Guo AuthorDate: Wed Feb 8 11:26:26 2023 +0800 [hotfix] Migrate ExecutionGraphPartitionReleaseTest to Junit5 and AssertJ. --- .../ExecutionGraphPartitionReleaseTest.java| 56 ++ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index 29711bc9956..5ff3af15b9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -33,31 +33,27 @@ import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorResource; -import org.apache.flink.util.TestLogger; +import org.apache.flink.testutils.executor.TestExecutorExtension; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.IsEqual.equalTo; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests for the interactions of the {@link ExecutionGraph} and {@link * PartitionGroupReleaseStrategy}. */ -public class ExecutionGraphPartitionReleaseTest extends TestLogger { +class ExecutionGraphPartitionReleaseTest { -@ClassRule -public static final TestExecutorResource EXECUTOR_RESOURCE = -TestingUtils.defaultExecutorResource(); +@RegisterExtension +public static final TestExecutorExtension EXECUTOR_EXTENSION = +TestingUtils.defaultExecutorExtension(); private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); @@ -67,7 +63,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger { scheduledExecutorService)); @Test -public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception { +void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception { // setup a simple pipeline of 3 operators with blocking partitions final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex(1); final JobVertex operatorVertex = ExecutionGraphTestUtils.createNoOpVertex(1); @@ -98,7 +94,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger { scheduler.updateTaskExecutionState( new TaskExecutionState( sourceExecution.getAttemptId(), ExecutionState.FINISHED)); -assertThat(releasedPartitions, empty()); +assertThat(releasedPartitions).isEmpty(); }); mainThreadExecutor.execute( @@ -110,10 +106,9 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger { scheduler.updateTaskExecutionState( new TaskExecutionState( operatorExecution.getAttemptId(), ExecutionState.FINISHED)); -assertThat(releasedPartitions, hasSize(1)); -assertThat( -releasedPartitions.remove(), -equalTo( +assertThat(releasedPartitions).hasSize(1); +assertThat(releasedPartitions.remove()) +.isEqualTo( new ResultPartitionID( sourceExecution .getVertex() @@ -121,7 +116,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger { .keySet() .iterator() .next(), -sourceExecution.getAttemptId(; +
[flink] branch release-1.17 updated (19c05ef0c86 -> a5536f2f2f5)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git from 19c05ef0c86 [FLINK-30969][python] Remove the __init__.py files under the examples new fa85ca20ac5 [hotfix] Migrate ExecutionGraphPartitionReleaseTest to Junit5 and AssertJ. new 8e8aa2aac16 [hotfix] Introduce Junit5 extension for TestingComponentMainThreadExecutor. new a5536f2f2f5 [FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest. The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../ExecutionGraphPartitionReleaseTest.java| 70 ++ .../TestingComponentMainThreadExecutor.java| 39 2 files changed, 70 insertions(+), 39 deletions(-)
[flink] 03/03: [FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit a5536f2f2f5b0574d8e8ecb768afd049667a1fba Author: Weijie Guo AuthorDate: Wed Feb 8 11:43:35 2023 +0800 [FLINK-30944] Fix thread leak in ExecutionGraphPartitionReleaseTest. --- .../executiongraph/ExecutionGraphPartitionReleaseTest.java | 14 ++ 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index 5ff3af15b9e..ca9fbfbb1ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -17,7 +17,6 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; @@ -40,7 +39,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static org.assertj.core.api.Assertions.assertThat; @@ -55,12 +53,12 @@ class ExecutionGraphPartitionReleaseTest { public static final TestExecutorExtension EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension(); -private static final ScheduledExecutorService scheduledExecutorService = -Executors.newSingleThreadScheduledExecutor(); -private static final TestingComponentMainThreadExecutor mainThreadExecutor = -new TestingComponentMainThreadExecutor( - ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( -scheduledExecutorService)); +@RegisterExtension +public static final TestingComponentMainThreadExecutor.Extension MAIN_THREAD_EXTENSION = +new TestingComponentMainThreadExecutor.Extension(); + +private final TestingComponentMainThreadExecutor mainThreadExecutor = +MAIN_THREAD_EXTENSION.getComponentMainThreadTestExecutor(); @Test void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception {
[flink] 02/03: [hotfix] Introduce Junit5 extension for TestingComponentMainThreadExecutor.
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 8e8aa2aac165620529ef674e7dc5d8e39ddfeae0 Author: Weijie Guo AuthorDate: Wed Feb 8 11:42:11 2023 +0800 [hotfix] Introduce Junit5 extension for TestingComponentMainThreadExecutor. --- .../TestingComponentMainThreadExecutor.java| 39 ++ 1 file changed, 39 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java index 10d4b17ec99..c767fc14d1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingComponentMainThreadExecutor.java @@ -25,6 +25,9 @@ import org.apache.flink.util.function.FunctionUtils; import org.apache.flink.util.function.SupplierWithException; import org.apache.flink.util.function.ThrowingRunnable; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.rules.ExternalResource; import javax.annotation.Nonnull; @@ -103,4 +106,40 @@ public class TestingComponentMainThreadExecutor { return componentMainThreadTestExecutor; } } + +/** Test extension for convenience. */ +public static class Extension implements BeforeAllCallback, AfterAllCallback { +private final long shutdownTimeoutMillis; + +private TestingComponentMainThreadExecutor componentMainThreadTestExecutor; + +private ScheduledExecutorService innerExecutorService; + +public Extension() { +this(500L); +} + +public Extension(long shutdownTimeoutMillis) { +this.shutdownTimeoutMillis = shutdownTimeoutMillis; +} + +@Override +public void beforeAll(ExtensionContext extensionContext) throws Exception { +this.innerExecutorService = Executors.newSingleThreadScheduledExecutor(); +this.componentMainThreadTestExecutor = +new TestingComponentMainThreadExecutor( + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +innerExecutorService)); +} + +@Override +public void afterAll(ExtensionContext extensionContext) throws Exception { +ExecutorUtils.gracefulShutdown( +shutdownTimeoutMillis, TimeUnit.MILLISECONDS, innerExecutorService); +} + +public TestingComponentMainThreadExecutor getComponentMainThreadTestExecutor() { +return componentMainThreadTestExecutor; +} +} }