This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new a826fe8d501 [FLINK-26726][hive]Hive enumerators do not assign splits to unregistered (failed) readers a826fe8d501 is described below commit a826fe8d501ed8bdd9cbdc5febf7aac4cfa0b947 Author: zoucao <32817398+zou...@users.noreply.github.com> AuthorDate: Fri Oct 14 10:27:01 2022 +0800 [FLINK-26726][hive]Hive enumerators do not assign splits to unregistered (failed) readers This closes #21038 --- flink-connectors/flink-connector-hive/pom.xml | 7 + .../hive/ContinuousHiveSplitEnumerator.java | 8 + .../hive/ContinuousHiveSplitEnumeratorTest.java | 194 +++++++++++++++++++++ 3 files changed, 209 insertions(+) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 28db2b0cc31..469c731cc79 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -858,6 +858,13 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- The derby we use suffers some security vulnerabilities. Explicitly set it to test scope here. --> <dependency> <groupId>org.apache.derby</groupId> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java index a061b27d576..22ee60dce4e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java @@ -167,6 +167,14 @@ public class ContinuousHiveSplitEnumerator<T extends Comparable<T>> readersAwaitingSplit.entrySet().iterator(); while (awaitingReader.hasNext()) { final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next(); + + // if the reader that requested another split has failed in the meantime, remove + // it from the list of waiting readers + if (!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) { + awaitingReader.remove(); + continue; + } + final String hostname = nextAwaiting.getValue(); final int awaitingSubtask = nextAwaiting.getKey(); final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java new file mode 100644 index 00000000000..828f6c3c477 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner; +import org.apache.flink.connector.file.table.ContinuousPartitionFetcher; +import org.apache.flink.connector.file.table.PartitionFetcher; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; +import org.apache.flink.connectors.hive.read.HiveSourceSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.catalog.ObjectPath; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link ContinuousHiveSplitEnumerator}. */ +public class ContinuousHiveSplitEnumeratorTest { + + @Test + public void testDiscoverSplitWhenNoReaderRegistered() throws Exception { + final TestingSplitEnumeratorContext<HiveSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + final HiveSourceSplit split = createSplit(); + final ContinuousHiveSplitEnumerator<Long> enumerator = + new ContinuousHiveSplitEnumerator( + context, + 0L, + Collections.emptySet(), + new SimpleSplitAssigner(Collections.singletonList(split)), + 1000L, + 1, + new JobConf(), + new ObjectPath("testDb", "testTable"), + mockPartitionFetcher(), + new MockHiveContinuousPartitionFetcherContext( + new ObjectPath("testDb", "testTable"))); + enumerator.start(); + context.triggerAllActions(); + + assertThat(enumerator.snapshotState(1L).getSplits()).contains(split); + } + + @Test + public void testDiscoverWhenReaderRegistered() throws Exception { + final TestingSplitEnumeratorContext<HiveSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + + final ContinuousHiveSplitEnumerator<Long> enumerator = + new ContinuousHiveSplitEnumerator( + context, + 0L, + Collections.emptySet(), + new SimpleSplitAssigner(Collections.emptyList()), + 1000L, + 1, + new JobConf(), + new ObjectPath("testDb", "testTable"), + mockPartitionFetcher(), + new MockHiveContinuousPartitionFetcherContext( + new ObjectPath("testDb", "testTable"))); + enumerator.start(); + // register one reader, and let it request a split + context.registerReader(2, "localhost"); + enumerator.addReader(2); + enumerator.handleSplitRequest(2, "localhost"); + final HiveSourceSplit split = createSplit(); + enumerator.addSplitsBack(Collections.singletonList(split), 0); + context.triggerAllActions(); + + assertThat(enumerator.snapshotState(1L).getSplits()).isEmpty(); + assertThat(context.getSplitAssignments().get(2).getAssignedSplits()).contains(split); + } + + @Test + public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception { + final TestingSplitEnumeratorContext<HiveSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + + final ContinuousHiveSplitEnumerator<Long> enumerator = + new ContinuousHiveSplitEnumerator( + context, + 0L, + Collections.emptySet(), + new SimpleSplitAssigner(Collections.emptyList()), + 1000L, + 1, + new JobConf(), + new ObjectPath("testDb", "testTable"), + mockPartitionFetcher(), + new MockHiveContinuousPartitionFetcherContext( + new ObjectPath("testDb", "testTable"))); + enumerator.start(); + // register one reader, and let it request a split + context.registerReader(2, "localhost"); + enumerator.addReader(2); + enumerator.handleSplitRequest(2, "localhost"); + + // remove the reader (like in a failure) + context.registeredReaders().remove(2); + final HiveSourceSplit split = createSplit(); + enumerator.addSplitsBack(Collections.singletonList(split), 0); + context.triggerAllActions(); + + assertThat(context.getSplitAssignments()).doesNotContainKey(2); + assertThat(enumerator.snapshotState(1L).getSplits()).contains(split); + } + + private HiveSourceSplit createSplit() { + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("/tmp"); + return new HiveSourceSplit( + "1", + new Path("/tmp"), + 0, + 0, + 0, + 0, + new String[] {"host1"}, + null, + new HiveTablePartition(sd, new HashMap<>(), new Properties())); + } + + private ContinuousPartitionFetcher<Partition, Long> mockPartitionFetcher() { + return new ContinuousPartitionFetcher<Partition, Long>() { + + private static final long serialVersionUID = 1L; + + @Override + public List<Tuple2<Partition, Long>> fetchPartitions( + Context<Partition, Long> context, Long previousOffset) throws Exception { + return Collections.emptyList(); + } + + @Override + public List<Partition> fetch(PartitionFetcher.Context<Partition> context) + throws Exception { + return Collections.emptyList(); + } + }; + } + + private static class MockHiveContinuousPartitionFetcherContext + extends HiveTableSource.HiveContinuousPartitionFetcherContext<Long> { + + public MockHiveContinuousPartitionFetcherContext(ObjectPath tablePath) { + super(tablePath, null, null, null, null, null, new Configuration(), "default"); + } + + @Override + public void close() throws Exception {} + + @Override + public void open() throws Exception {} + + @Override + public Optional<Partition> getPartition(List<String> partValues) throws TException { + return Optional.empty(); + } + + @Override + public HiveTablePartition toHiveTablePartition(Partition partition) { + return new HiveTablePartition(partition.getSd(), new HashMap<>(), new Properties()); + } + } +}