This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a80f769  [FLINK-18232][hive] Fix Hive streaming source bugs
a80f769 is described below

commit a80f7690367e9a07b19321f3b34d8fdba17295b8
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Thu Jun 11 10:30:46 2020 +0800

    [FLINK-18232][hive] Fix Hive streaming source bugs
    
    
    This closes #12573
---
 .../flink/connectors/hive/HiveTableSource.java     |  7 +-
 .../hive/read/DirectoryMonitorDiscovery.java       | 32 +++++++-
 .../hive/read/HiveMapredSplitReader.java           | 49 +++++-------
 .../hive/read/HiveTableFileInputFormat.java        | 19 ++++-
 .../connectors/hive/HiveTableSourceITCase.java     | 10 ++-
 .../hive/read/DirectoryMonitorDiscoveryTest.java   | 90 ++++++++++++++++++++++
 .../hive/read/HiveTableFileInputFormatTest.java    | 50 ++++++++++++
 7 files changed, 214 insertions(+), 43 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 080cea8..2be4443 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.filesystem.FileSystemLookupFunction;
 import org.apache.flink.table.filesystem.FileSystemOptions;
 import org.apache.flink.table.functions.AsyncTableFunction;
@@ -87,7 +88,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
+import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toLocalDateTime;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
@@ -283,7 +284,9 @@ public class HiveTableSource implements
                }
 
                String consumeOffset = 
configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET);
-               long currentReadTime = toMills(consumeOffset);
+               // to Local zone mills instead of UTC mills
+               long currentReadTime = 
TimestampData.fromLocalDateTime(toLocalDateTime(consumeOffset))
+                               .toTimestamp().getTime();
 
                Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java
index f027302..aa15c81 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.connectors.hive.read;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.TimestampData;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -26,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.Partition;
 
 import java.io.IOException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,18 +44,39 @@ public class DirectoryMonitorDiscovery implements 
PartitionDiscovery {
                        Context context, long previousTimestamp) throws 
Exception {
                FileStatus[] statuses = getFileStatusRecurse(
                                context.tableLocation(), 
context.partitionKeys().size(), context.fileSystem());
+               List<Tuple2<List<String>, Long>> partValueList = 
suitablePartitions(context, previousTimestamp, statuses);
+
                List<Tuple2<Partition, Long>> partitions = new ArrayList<>();
+               for (Tuple2<List<String>, Long> tuple2 : partValueList) {
+                       context.getPartition(tuple2.f0).ifPresent(
+                                       partition -> partitions.add(new 
Tuple2<>(partition, tuple2.f1)));
+               }
+               return partitions;
+       }
+
+       /**
+        * Find suitable partitions, extract timestamp and compare it with 
previousTimestamp.
+        */
+       @VisibleForTesting
+       static List<Tuple2<List<String>, Long>> suitablePartitions(
+                       Context context,
+                       long previousTimestamp,
+                       FileStatus[] statuses) {
+               List<Tuple2<List<String>, Long>> partValueList = new 
ArrayList<>();
                for (FileStatus status : statuses) {
                        List<String> partValues = extractPartitionValues(
                                        new 
org.apache.flink.core.fs.Path(status.getPath().toString()));
                        long timestamp = context.extractTimestamp(
-                                       context.partitionKeys(), partValues, 
status::getModificationTime);
+                                       context.partitionKeys(),
+                                       partValues,
+                                       // to UTC millisecond.
+                                       () -> TimestampData.fromTimestamp(
+                                                       new 
Timestamp(status.getModificationTime())).getMillisecond());
                        if (timestamp >= previousTimestamp) {
-                               context.getPartition(partValues).ifPresent(
-                                               partition -> partitions.add(new 
Tuple2<>(partition, timestamp)));
+                               partValueList.add(new Tuple2<>(partValues, 
timestamp));
                        }
                }
-               return partitions;
+               return partValueList;
        }
 
        private static FileStatus[] getFileStatusRecurse(Path path, int 
expectLevel, FileSystem fs) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
index bc4ff13..843f746 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
@@ -74,18 +74,12 @@ public class HiveMapredSplitReader implements SplitReader {
        //StructObjectInspector in hive helps us to look into the internal 
structure of a struct object.
        private final StructObjectInspector structObjectInspector;
 
-       // Remember whether a row instance is reused. No need to set partition 
fields for reused rows
-       private boolean rowReused = false;
-
-       //Necessary info to init deserializer
-       private final List<String> partitionKeys;
-
        private final DataFormatConverters.DataFormatConverter[] converters;
 
-       private final HiveTablePartition hiveTablePartition;
-
        private final HiveShim hiveShim;
 
+       private final GenericRowData row;
+
        public HiveMapredSplitReader(
                        JobConf jobConf,
                        List<String> partitionKeys,
@@ -93,7 +87,7 @@ public class HiveMapredSplitReader implements SplitReader {
                        int[] selectedFields,
                        HiveTableInputSplit split,
                        HiveShim hiveShim) throws IOException {
-               this.hiveTablePartition = split.getHiveTablePartition();
+               HiveTablePartition hiveTablePartition = 
split.getHiveTablePartition();
                StorageDescriptor sd = 
hiveTablePartition.getStorageDescriptor();
                jobConf.set(INPUT_DIR, sd.getLocation());
                InputFormat mapredInputFormat;
@@ -128,18 +122,30 @@ public class HiveMapredSplitReader implements SplitReader 
{
                }
 
                this.selectedFields = selectedFields;
-               this.partitionKeys = partitionKeys;
-               converters = Arrays.stream(selectedFields)
+               this.converters = Arrays.stream(selectedFields)
                                .mapToObj(i -> fieldTypes[i])
                                
.map(DataFormatConverters::getConverterForDataType)
                                
.toArray(DataFormatConverters.DataFormatConverter[]::new);
                this.hiveShim = hiveShim;
+
+               // construct reuse row
+               this.row = new GenericRowData(selectedFields.length);
+               // set partition columns
+               if (!partitionKeys.isEmpty()) {
+                       for (int i = 0; i < selectedFields.length; i++) {
+                               if (selectedFields[i] >= structFields.size()) {
+                                       String partition = 
partitionKeys.get(selectedFields[i] - structFields.size());
+                                       row.setField(i, 
converters[i].toInternal(hiveTablePartition.getPartitionSpec().get(partition)));
+                               }
+                       }
+               }
        }
 
        @Override
        public boolean reachedEnd() throws IOException {
                if (!fetched) {
-                       fetchNext();
+                       hasNext = recordReader.next(key, value);
+                       fetched = true;
                }
                return !hasNext;
        }
@@ -150,8 +156,6 @@ public class HiveMapredSplitReader implements SplitReader {
                if (reachedEnd()) {
                        return null;
                }
-               final GenericRowData row = reuse instanceof GenericRowData ?
-                               (GenericRowData) reuse : new 
GenericRowData(selectedFields.length);
                try {
                        //Use HiveDeserializer to deserialize an object out of 
a Writable blob
                        Object hiveRowStruct = deserializer.deserialize(value);
@@ -168,27 +172,10 @@ public class HiveMapredSplitReader implements SplitReader 
{
                        LOG.error("Error happens when converting hive data type 
to flink data type.");
                        throw new FlinkHiveException(e);
                }
-               if (!rowReused) {
-                       // set partition columns
-                       if (!partitionKeys.isEmpty()) {
-                               for (int i = 0; i < selectedFields.length; i++) 
{
-                                       if (selectedFields[i] >= 
structFields.size()) {
-                                               String partition = 
partitionKeys.get(selectedFields[i] - structFields.size());
-                                               row.setField(i, 
converters[i].toInternal(hiveTablePartition.getPartitionSpec().get(partition)));
-                                       }
-                               }
-                       }
-                       rowReused = true;
-               }
                this.fetched = false;
                return row;
        }
 
-       private void fetchNext() throws IOException {
-               hasNext = recordReader.next(key, value);
-               fetched = true;
-       }
-
        @Override
        public void close() throws IOException {
                if (this.recordReader != null) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java
index eb1fb41..29e890b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connectors.hive.read;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -52,16 +53,26 @@ public class HiveTableFileInputFormat extends 
FileInputFormat<RowData> {
 
        @Override
        public void open(FileInputSplit fileSplit) throws IOException {
-               URI uri = fileSplit.getPath().toUri();
                HiveTableInputSplit split = new HiveTableInputSplit(
                                fileSplit.getSplitNumber(),
-                               new FileSplit(new Path(uri), 
fileSplit.getStart(), fileSplit.getLength(), (String[]) null),
+                               toHadoopFileSplit(fileSplit),
                                inputFormat.getJobConf(),
-                               hiveTablePartition
-               );
+                               hiveTablePartition);
                inputFormat.open(split);
        }
 
+       @VisibleForTesting
+       static FileSplit toHadoopFileSplit(FileInputSplit fileSplit) throws 
IOException {
+               URI uri = fileSplit.getPath().toUri();
+               long length = fileSplit.getLength();
+               // Hadoop FileSplit should not have -1 length.
+               if (length == -1) {
+                       length = 
fileSplit.getPath().getFileSystem().getFileStatus(fileSplit.getPath()).getLen() 
-
+                                       fileSplit.getStart();
+               }
+               return new FileSplit(new Path(uri), fileSplit.getStart(), 
length, (String[]) null);
+       }
+
        @Override
        public boolean reachedEnd() throws IOException {
                return inputFormat.reachedEnd();
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 32a5cd4..5170920 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -473,7 +473,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                                " b STRING" +
                                ") PARTITIONED BY (ts STRING) TBLPROPERTIES (" +
                                "'streaming-source.enable'='true'," +
-                               "'streaming-source.monitor-interval'='100ms'," +
+                               "'streaming-source.monitor-interval'='1s'," +
                                
"'streaming-source.consume-order'='partition-time'" +
                                ")");
 
@@ -502,6 +502,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                                }
                                
HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
                                                .addRow(new Object[]{i, 
String.valueOf(i)})
+                                               .addRow(new Object[]{i, i + 
"_copy"})
                                                .commit("ts='2020-05-06 00:" + 
i + "0:00'");
                        }
                };
@@ -514,10 +515,15 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                List<String> expected = Arrays.asList(
                                "+I(0,0,2020-05-06 00:00:00)",
                                "+I(1,1,2020-05-06 00:10:00)",
+                               "+I(1,1_copy,2020-05-06 00:10:00)",
                                "+I(2,2,2020-05-06 00:20:00)",
+                               "+I(2,2_copy,2020-05-06 00:20:00)",
                                "+I(3,3,2020-05-06 00:30:00)",
+                               "+I(3,3_copy,2020-05-06 00:30:00)",
                                "+I(4,4,2020-05-06 00:40:00)",
-                               "+I(5,5,2020-05-06 00:50:00)"
+                               "+I(4,4_copy,2020-05-06 00:40:00)",
+                               "+I(5,5,2020-05-06 00:50:00)",
+                               "+I(5,5_copy,2020-05-06 00:50:00)"
                );
                List<String> results = sink.getJavaAppendResults();
                results.sort(String::compareTo);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java
new file mode 100644
index 0000000..ad514d4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * Test for {@link DirectoryMonitorDiscovery}.
+ */
+public class DirectoryMonitorDiscoveryTest {
+
+       private static FileStatus status(String time) {
+               return new FileStatus(0L, false, 0, 0L, 
Timestamp.valueOf(time).getTime(), 0L, null, null, null, new 
Path("/tmp/dummy"));
+       }
+
+       @Test
+       public void testUTC() {
+               long previousTimestamp = 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-05-06 
12:22:00")).getMillisecond();
+               FileStatus[] statuses = new FileStatus[] {
+                               status("2020-05-06 12:20:00"),
+                               status("2020-05-06 12:21:00"),
+                               status("2020-05-06 12:22:00"),
+                               status("2020-05-06 12:23:00"),
+                               status("2020-05-06 12:24:00")};
+               List<Tuple2<List<String>, Long>> parts = 
DirectoryMonitorDiscovery.suitablePartitions(
+                               new PartitionDiscovery.Context() {
+
+                                       @Override
+                                       public List<String> partitionKeys() {
+                                               return null;
+                                       }
+
+                                       @Override
+                                       public Optional<Partition> 
getPartition(List<String> partValues) {
+                                               return Optional.empty();
+                                       }
+
+                                       @Override
+                                       public FileSystem fileSystem() {
+                                               return null;
+                                       }
+
+                                       @Override
+                                       public Path tableLocation() {
+                                               return null;
+                                       }
+
+                                       @Override
+                                       public long 
extractTimestamp(List<String> partKeys, List<String> partValues,
+                                                       Supplier<Long> 
fileTime) {
+                                               return fileTime.get();
+                                       }
+                               },
+                               previousTimestamp,
+                               statuses);
+               Assert.assertEquals(3, parts.size());
+               Assert.assertEquals("2020-05-06T12:22", 
TimestampData.fromEpochMillis(parts.get(0).f1).toString());
+               Assert.assertEquals("2020-05-06T12:23", 
TimestampData.fromEpochMillis(parts.get(1).f1).toString());
+               Assert.assertEquals("2020-05-06T12:24", 
TimestampData.fromEpochMillis(parts.get(2).f1).toString());
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java
new file mode 100644
index 0000000..9a4fb37
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Test for {@link HiveTableFileInputFormat}.
+ */
+public class HiveTableFileInputFormatTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       @Test
+       public void testSplit() throws IOException {
+               File file = TEMPORARY_FOLDER.newFile();
+               FileUtils.writeFileUtf8(file, "hahahahahahaha");
+               FileInputSplit split = new FileInputSplit(0, new 
Path(file.getPath()), 0, -1, null);
+               FileSplit fileSplit = 
HiveTableFileInputFormat.toHadoopFileSplit(split);
+               Assert.assertEquals(14, fileSplit.getLength());
+       }
+}

Reply via email to