This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new b30a502c53e [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started. b30a502c53e is described below commit b30a502c53eaca95630eddd03022871f17fdf299 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Sep 27 15:49:29 2022 +0800 [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started. This closes #20906. --- .../flink/connectors/hive/HiveDialectITCase.java | 135 ------------ .../hive/HiveDynamicPartitionPruningITCase.java | 234 +++++++++++++++++++++ .../flink/table/catalog/hive/HiveTestUtils.java | 19 ++ .../operators/coordination/CoordinatorStore.java | 4 + .../coordination/CoordinatorStoreImpl.java | 5 + .../source/coordinator/SourceCoordinator.java | 37 +++- ...cFilteringDataCollectorOperatorCoordinator.java | 58 +++-- 7 files changed, 334 insertions(+), 158 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index 81487871c78..32548dd731a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.connectors.hive; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; import org.apache.flink.table.HiveVersionTestUtil; import org.apache.flink.table.api.SqlDialect; @@ -26,7 +25,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -37,7 +35,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.functions.hive.HiveGenericUDTFTest; @@ -1287,138 +1284,6 @@ public class HiveDialectITCase { } } - @Test - public void testDynamicPartitionPruning() throws Exception { - // src table - tableEnv.executeSql("create table dim (x int,y string,z int)"); - tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await(); - - // partitioned dest table - tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)"); - tableEnv.executeSql( - "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ") - .await(); - tableEnv.executeSql( - "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ") - .await(); - tableEnv.executeSql( - "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ") - .await(); - - System.out.println( - tableEnv.explainSql( - "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a")); - - tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4); - tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); - tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); - - String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"; - String sqlSwapFactDim = - "select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a"; - - String expected = - "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], " - + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]"; - - // Check dynamic partition pruning is working - String plan = tableEnv.explainSql(sql); - assertThat(plan).contains("DynamicFilteringDataCollector"); - - plan = tableEnv.explainSql(sqlSwapFactDim); - assertThat(plan).contains("DynamicFilteringDataCollector"); - - // Validate results - List<Row> results = queryResult(tableEnv.sqlQuery(sql)); - assertThat(results.toString()).isEqualTo(expected); - - results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim)); - assertThat(results.toString()).isEqualTo(expected); - - // Validate results with table statistics - tableEnv.getCatalog(tableEnv.getCurrentCatalog()) - .get() - .alterTableStatistics( - new ObjectPath(tableEnv.getCurrentDatabase(), "dim"), - new CatalogTableStatistics(3, -1, -1, -1), - false); - - results = queryResult(tableEnv.sqlQuery(sql)); - assertThat(results.toString()).isEqualTo(expected); - - results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim)); - assertThat(results.toString()).isEqualTo(expected); - } - - @Test - public void testDynamicPartitionPruningOnTwoFactTables() throws Exception { - tableEnv.executeSql("create table dim (x int,y string,z int)"); - tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await(); - - // partitioned dest table - tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)"); - tableEnv.executeSql( - "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ") - .await(); - tableEnv.executeSql( - "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ") - .await(); - tableEnv.executeSql( - "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ") - .await(); - - // partitioned dest table - tableEnv.executeSql( - "create table fact2 (a int, b bigint, c string) partitioned by (p int)"); - tableEnv.executeSql( - "insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ") - .await(); - tableEnv.executeSql( - "insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ") - .await(); - tableEnv.executeSql( - "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ") - .await(); - - tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4); - - tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); - tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); - - // two fact sources share the same dynamic filter - String sql = - "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) " - + "union all " - + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a"; - String expected = - "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], " - + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], " - + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], " - + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]"; - - String plan = tableEnv.explainSql(sql); - assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id="); - - List<Row> results = queryResult(tableEnv.sqlQuery(sql)); - assertThat(results.toString()).isEqualTo(expected); - - // two fact sources use different dynamic filters - String sql2 = - "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) " - + "union all " - + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a"; - String expected2 = - "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], " - + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], " - + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]"; - - plan = tableEnv.explainSql(sql2); - assertThat(plan).contains("DynamicFilteringDataCollector"); - - results = queryResult(tableEnv.sqlQuery(sql2)); - assertThat(results.toString()).isEqualTo(expected2); - } - private void verifyUnsupportedOperation(String ddl) { assertThatThrownBy(() -> tableEnv.executeSql(ddl)) .isInstanceOf(ValidationException.class) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java new file mode 100644 index 00000000000..dce66345c24 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.FileUtils; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests the dynamic partition pruning optimization on Hive sources. */ +@ExtendWith(ParameterizedTestExtension.class) +public class HiveDynamicPartitionPruningITCase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); + + @Parameter public boolean enableAdaptiveBatchScheduler; + + @Parameters(name = "enableAdaptiveBatchScheduler={0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + private TableEnvironment tableEnv; + private HiveCatalog hiveCatalog; + private String warehouse; + + @BeforeEach + public void setup() { + hiveCatalog = HiveTestUtils.createHiveCatalog(); + hiveCatalog + .getHiveConf() + .setBoolVar( + HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, false); + hiveCatalog.open(); + warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + + if (enableAdaptiveBatchScheduler) { + tableEnv = HiveTestUtils.createTableEnvInBatchModeWithAdaptiveScheduler(); + } else { + tableEnv = HiveTestUtils.createTableEnvInBatchMode(); + } + + tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tableEnv.useCatalog(hiveCatalog.getName()); + } + + @AfterEach + public void tearDown() { + if (hiveCatalog != null) { + hiveCatalog.close(); + } + if (warehouse != null) { + FileUtils.deleteDirectoryQuietly(new File(warehouse)); + } + } + + @TestTemplate + public void testDynamicPartitionPruning() throws Exception { + // src table + tableEnv.executeSql("create table dim (x int,y string,z int)"); + tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await(); + + // partitioned dest table + tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)"); + tableEnv.executeSql( + "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ") + .await(); + tableEnv.executeSql( + "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ") + .await(); + tableEnv.executeSql( + "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ") + .await(); + + System.out.println( + tableEnv.explainSql( + "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a")); + + tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + + String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"; + String sqlSwapFactDim = + "select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a"; + + String expected = + "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], " + + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]"; + + // Check dynamic partition pruning is working + String plan = tableEnv.explainSql(sql); + assertThat(plan).contains("DynamicFilteringDataCollector"); + + plan = tableEnv.explainSql(sqlSwapFactDim); + assertThat(plan).contains("DynamicFilteringDataCollector"); + + // Validate results + List<Row> results = queryResult(tableEnv.sqlQuery(sql)); + assertThat(results.toString()).isEqualTo(expected); + + results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim)); + assertThat(results.toString()).isEqualTo(expected); + + // Validate results with table statistics + tableEnv.getCatalog(tableEnv.getCurrentCatalog()) + .get() + .alterTableStatistics( + new ObjectPath(tableEnv.getCurrentDatabase(), "dim"), + new CatalogTableStatistics(3, -1, -1, -1), + false); + + results = queryResult(tableEnv.sqlQuery(sql)); + assertThat(results.toString()).isEqualTo(expected); + + results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim)); + assertThat(results.toString()).isEqualTo(expected); + } + + @TestTemplate + public void testDynamicPartitionPruningOnTwoFactTables() throws Exception { + tableEnv.executeSql("create table dim (x int,y string,z int)"); + tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await(); + + // partitioned dest table + tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)"); + tableEnv.executeSql( + "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ") + .await(); + tableEnv.executeSql( + "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ") + .await(); + tableEnv.executeSql( + "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ") + .await(); + + // partitioned dest table + tableEnv.executeSql( + "create table fact2 (a int, b bigint, c string) partitioned by (p int)"); + tableEnv.executeSql( + "insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ") + .await(); + tableEnv.executeSql( + "insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ") + .await(); + tableEnv.executeSql( + "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ") + .await(); + + tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false); + + // two fact sources share the same dynamic filter + String sql = + "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) " + + "union all " + + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a"; + String expected = + "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], " + + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], " + + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], " + + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]"; + + String plan = tableEnv.explainSql(sql); + assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id="); + + List<Row> results = queryResult(tableEnv.sqlQuery(sql)); + assertThat(results.toString()).isEqualTo(expected); + + // two fact sources use different dynamic filters + String sql2 = + "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) " + + "union all " + + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a"; + String expected2 = + "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], " + + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], " + + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]"; + + plan = tableEnv.explainSql(sql2); + assertThat(plan).contains("DynamicFilteringDataCollector"); + + results = queryResult(tableEnv.sqlQuery(sql2)); + assertThat(results.toString()).isEqualTo(expected2); + } + + private static List<Row> queryResult(org.apache.flink.table.api.Table table) { + return CollectionUtil.iteratorToList(table.execute().collect()); + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index 65bd028c349..266b4097799 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -18,6 +18,9 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.sql.parser.SqlPartitionUtils; import org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions; import org.apache.flink.sql.parser.hive.impl.FlinkHiveSqlParserImpl; @@ -158,6 +161,22 @@ public class HiveTestUtils { return tableEnv; } + public static TableEnvironment createTableEnvInBatchModeWithAdaptiveScheduler() { + EnvironmentSettings settings = EnvironmentSettings.inBatchMode(); + settings.getConfiguration() + .set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch); + settings.getConfiguration() + .set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4); + settings.getConfiguration() + .set( + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK, + MemorySize.parse("150kb")); + settings.getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, -1); + TableEnvironment tableEnv = TableEnvironment.create(settings); + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + return tableEnv; + } + public static StreamTableEnvironment createTableEnvInStreamingMode( StreamExecutionEnvironment env) { return createTableEnvInStreamingMode(env, SqlDialect.DEFAULT); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java index ce0a5192602..90552e0adc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java @@ -29,6 +29,8 @@ import java.util.function.Function; * {@link CoordinatorStore} can be used for sharing some information among {@link * OperatorCoordinator} instances. Motivating example is/was combining/aggregating latest watermark * emitted by different sources in order to do the watermark alignment. + * + * <p>Implementations of this interface must ensure that all operations are atomic. */ @ThreadSafe @Internal @@ -41,5 +43,7 @@ public interface CoordinatorStore { Object computeIfPresent(Object key, BiFunction<Object, Object, Object> remappingFunction); + Object compute(Object key, BiFunction<Object, Object, Object> mappingFunction); + <R> R apply(Object key, Function<Object, R> consumer); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java index 72daf833e1a..6956d94006c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java @@ -54,6 +54,11 @@ public class CoordinatorStoreImpl implements CoordinatorStore { return store.computeIfPresent(key, remappingFunction); } + @Override + public Object compute(Object key, BiFunction<Object, Object, Object> mappingFunction) { + return store.compute(key, mappingFunction); + } + @Override public <R> R apply(Object key, Function<Object, R> consumer) { return consumer.apply(store.get(key)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index c4083f9d173..842b768050d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -225,13 +225,36 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator."); if (coordinatorListeningID != null) { - if (coordinatorStore.containsKey(coordinatorListeningID)) { - // The coordinator will be recreated after global failover. It should be registered - // again replacing the previous one. - coordinatorStore.computeIfPresent(coordinatorListeningID, (id, origin) -> this); - } else { - coordinatorStore.putIfAbsent(coordinatorListeningID, this); - } + coordinatorStore.compute( + coordinatorListeningID, + (key, oldValue) -> { + // The value for a listener ID can be a source coordinator listening to an + // event, or an event waiting to be retrieved + if (oldValue == null || oldValue instanceof OperatorCoordinator) { + // The coordinator has not registered or needs to be recreated after + // global failover. + return this; + } else { + checkState( + oldValue instanceof OperatorEvent, + "The existing value for " + + coordinatorStore + + "is expected to be an operator event, but it is in fact " + + oldValue); + LOG.info( + "Handling event {} received before the source coordinator with ID {} is registered", + oldValue, + coordinatorListeningID); + handleEventFromOperator(0, 0, (OperatorEvent) oldValue); + + // Since for non-global failover the coordinator will not be recreated + // and for global failover both the sender and receiver need to restart, + // the coordinator will receive the event only once. + // As the event has been processed, it can be removed safely and there's + // no need to register the coordinator for further events as well. + return null; + } + }); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java index 50b10768765..bc0b6219e37 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCo import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.table.connector.source.DynamicFilteringData; import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator @@ -68,8 +70,7 @@ public class DynamicFilteringDataCollectorOperatorCoordinator public void close() throws Exception {} @Override - public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) - throws Exception { + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { DynamicFilteringData currentData = ((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData(); if (receivedFilteringData == null) { @@ -92,20 +93,45 @@ public class DynamicFilteringDataCollectorOperatorCoordinator } for (String listenerID : dynamicFilteringDataListenerIDs) { - // Push event to listening source coordinators. - OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID); - if (listener == null) { - throw new IllegalStateException( - "Dynamic filtering data listener is missing: " + listenerID); - } else { - LOG.info( - "Distributing event {} to source coordinator with ID {}", - event, - listenerID); - // Subtask index and attempt number is not necessary for handling - // DynamicFilteringEvent. - listener.handleEventFromOperator(0, 0, event); - } + coordinatorStore.compute( + listenerID, + (key, oldValue) -> { + // The value for a listener ID can be a source coordinator listening to an + // event, or an event waiting to be retrieved + if (oldValue == null || oldValue instanceof OperatorEvent) { + // If the listener has not been registered, or after a global failover + // without cleanup the store, we simply update it to the latest value. + // The listener coordinator would retrieve the event once it's started. + LOG.info( + "Updating event {} before the source coordinator with ID {} is registered", + event, + listenerID); + return event; + } else { + checkState( + oldValue instanceof OperatorCoordinator, + "The existing value for " + + listenerID + + "is expected to be an operator coordinator, but it is in fact " + + oldValue); + LOG.info( + "Distributing event {} to source coordinator with ID {}", + event, + listenerID); + try { + // Subtask index and attempt number is not necessary for handling + // DynamicFilteringEvent. + ((OperatorCoordinator) oldValue) + .handleEventFromOperator(0, 0, event); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + + // Dynamic filtering event is expected to be sent only once. So after + // the coordinator is notified, it can be removed from the store. + return null; + } + }); } }