This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 81d2c1b340f [FLINK-26726][hive] Hive enumerators do not assign splits 
to unregistered (failed) readers
81d2c1b340f is described below

commit 81d2c1b340f3a4d063e97db2519d6911028d807d
Author: zoucao <32817398+zou...@users.noreply.github.com>
AuthorDate: Fri Oct 14 10:27:42 2022 +0800

    [FLINK-26726][hive] Hive enumerators do not assign splits to unregistered 
(failed) readers
    
    This closes #21044
---
 flink-connectors/flink-connector-hive/pom.xml      |   7 +
 .../hive/ContinuousHiveSplitEnumerator.java        |   8 +
 .../hive/ContinuousHiveSplitEnumeratorTest.java    | 192 +++++++++++++++++++++
 3 files changed, 207 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index 0d63f808bfd..73c06a170bd 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -846,6 +846,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- The derby we use suffers some security vulnerabilities. 
Explicitly set it to test scope here. -->
                <dependency>
                        <groupId>org.apache.derby</groupId>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
index d5884d1f224..c8c3cfc73ab 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -165,6 +165,14 @@ public class ContinuousHiveSplitEnumerator<T extends 
Comparable<T>>
                 readersAwaitingSplit.entrySet().iterator();
         while (awaitingReader.hasNext()) {
             final Map.Entry<Integer, String> nextAwaiting = 
awaitingReader.next();
+
+            // if the reader that requested another split has failed in the 
meantime, remove
+            // it from the list of waiting readers
+            if 
(!enumeratorContext.registeredReaders().containsKey(nextAwaiting.getKey())) {
+                awaitingReader.remove();
+                continue;
+            }
+
             final String hostname = nextAwaiting.getValue();
             final int awaitingSubtask = nextAwaiting.getKey();
             final Optional<FileSourceSplit> nextSplit = 
splitAssigner.getNext(hostname);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java
new file mode 100644
index 00000000000..d8f5417cad9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumeratorTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+import org.apache.flink.table.filesystem.PartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/** Unit tests for the {@link ContinuousHiveSplitEnumerator}. */
+public class ContinuousHiveSplitEnumeratorTest {
+
+    @Test
+    public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
+        final TestingSplitEnumeratorContext<HiveSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        final HiveSourceSplit split = createSplit();
+        final ContinuousHiveSplitEnumerator<Long> enumerator =
+                new ContinuousHiveSplitEnumerator(
+                        context,
+                        0L,
+                        Collections.emptySet(),
+                        new 
SimpleSplitAssigner(Collections.singletonList(split)),
+                        1000L,
+                        new JobConf(),
+                        new ObjectPath("testDb", "testTable"),
+                        mockPartitionFetcher(),
+                        new MockHiveContinuousPartitionFetcherContext(
+                                new ObjectPath("testDb", "testTable")));
+        enumerator.start();
+        context.triggerAllActions();
+
+        assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
+    }
+
+    @Test
+    public void testDiscoverWhenReaderRegistered() throws Exception {
+        final TestingSplitEnumeratorContext<HiveSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+
+        final ContinuousHiveSplitEnumerator<Long> enumerator =
+                new ContinuousHiveSplitEnumerator(
+                        context,
+                        0L,
+                        Collections.emptySet(),
+                        new SimpleSplitAssigner(Collections.emptyList()),
+                        1000L,
+                        new JobConf(),
+                        new ObjectPath("testDb", "testTable"),
+                        mockPartitionFetcher(),
+                        new MockHiveContinuousPartitionFetcherContext(
+                                new ObjectPath("testDb", "testTable")));
+        enumerator.start();
+        // register one reader, and let it request a split
+        context.registerReader(2, "localhost");
+        enumerator.addReader(2);
+        enumerator.handleSplitRequest(2, "localhost");
+        final HiveSourceSplit split = createSplit();
+        enumerator.addSplitsBack(Collections.singletonList(split), 0);
+        context.triggerAllActions();
+
+        assertThat(enumerator.snapshotState(1L).getSplits(), empty());
+        assertThat(context.getSplitAssignments().get(2).getAssignedSplits(), 
contains(split));
+    }
+
+    @Test
+    public void testRequestingReaderUnavailableWhenSplitDiscovered() throws 
Exception {
+        final TestingSplitEnumeratorContext<HiveSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+
+        final ContinuousHiveSplitEnumerator<Long> enumerator =
+                new ContinuousHiveSplitEnumerator(
+                        context,
+                        0L,
+                        Collections.emptySet(),
+                        new SimpleSplitAssigner(Collections.emptyList()),
+                        1000L,
+                        new JobConf(),
+                        new ObjectPath("testDb", "testTable"),
+                        mockPartitionFetcher(),
+                        new MockHiveContinuousPartitionFetcherContext(
+                                new ObjectPath("testDb", "testTable")));
+        enumerator.start();
+        // register one reader, and let it request a split
+        context.registerReader(2, "localhost");
+        enumerator.addReader(2);
+        enumerator.handleSplitRequest(2, "localhost");
+
+        // remove the reader (like in a failure)
+        context.registeredReaders().remove(2);
+        final HiveSourceSplit split = createSplit();
+        enumerator.addSplitsBack(Collections.singletonList(split), 0);
+        context.triggerAllActions();
+
+        assertFalse(context.getSplitAssignments().containsKey(2));
+        assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
+    }
+
+    private HiveSourceSplit createSplit() {
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setLocation("/tmp");
+        return new HiveSourceSplit(
+                "1",
+                new Path("/tmp"),
+                0,
+                0,
+                new String[] {"host1"},
+                null,
+                new HiveTablePartition(sd, new HashMap<>(), new Properties()));
+    }
+
+    private ContinuousPartitionFetcher<Partition, Long> mockPartitionFetcher() 
{
+        return new ContinuousPartitionFetcher<Partition, Long>() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public List<Tuple2<Partition, Long>> fetchPartitions(
+                    Context<Partition, Long> context, Long previousOffset) 
throws Exception {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public List<Partition> fetch(PartitionFetcher.Context<Partition> 
context)
+                    throws Exception {
+                return Collections.emptyList();
+            }
+        };
+    }
+
+    private static class MockHiveContinuousPartitionFetcherContext
+            extends 
HiveTableSource.HiveContinuousPartitionFetcherContext<Long> {
+
+        public MockHiveContinuousPartitionFetcherContext(ObjectPath tablePath) 
{
+            super(tablePath, null, null, null, null, null, new 
Configuration(), "default");
+        }
+
+        @Override
+        public void close() throws Exception {}
+
+        @Override
+        public void open() throws Exception {}
+
+        @Override
+        public Optional<Partition> getPartition(List<String> partValues) 
throws TException {
+            return Optional.empty();
+        }
+
+        @Override
+        public HiveTablePartition toHiveTablePartition(Partition partition) {
+            return new HiveTablePartition(partition.getSd(), new HashMap<>(), 
new Properties());
+        }
+    }
+}

Reply via email to