This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new a80f769 [FLINK-18232][hive] Fix Hive streaming source bugs a80f769 is described below commit a80f7690367e9a07b19321f3b34d8fdba17295b8 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Thu Jun 11 10:30:46 2020 +0800 [FLINK-18232][hive] Fix Hive streaming source bugs This closes #12573 --- .../flink/connectors/hive/HiveTableSource.java | 7 +- .../hive/read/DirectoryMonitorDiscovery.java | 32 +++++++- .../hive/read/HiveMapredSplitReader.java | 49 +++++------- .../hive/read/HiveTableFileInputFormat.java | 19 ++++- .../connectors/hive/HiveTableSourceITCase.java | 10 ++- .../hive/read/DirectoryMonitorDiscoveryTest.java | 90 ++++++++++++++++++++++ .../hive/read/HiveTableFileInputFormatTest.java | 50 ++++++++++++ 7 files changed, 214 insertions(+), 43 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 080cea8..2be4443 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -46,6 +46,7 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.filesystem.FileSystemLookupFunction; import org.apache.flink.table.filesystem.FileSystemOptions; import org.apache.flink.table.functions.AsyncTableFunction; @@ -87,7 +88,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills; +import static org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toLocalDateTime; import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS; import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND; import static org.apache.flink.table.filesystem.FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN; @@ -283,7 +284,9 @@ public class HiveTableSource implements } String consumeOffset = configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET); - long currentReadTime = toMills(consumeOffset); + // to Local zone mills instead of UTC mills + long currentReadTime = TimestampData.fromLocalDateTime(toLocalDateTime(consumeOffset)) + .toTimestamp().getTime(); Duration monitorInterval = configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java index f027302..aa15c81 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscovery.java @@ -18,7 +18,9 @@ package org.apache.flink.connectors.hive.read; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.TimestampData; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -26,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Partition; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; @@ -41,18 +44,39 @@ public class DirectoryMonitorDiscovery implements PartitionDiscovery { Context context, long previousTimestamp) throws Exception { FileStatus[] statuses = getFileStatusRecurse( context.tableLocation(), context.partitionKeys().size(), context.fileSystem()); + List<Tuple2<List<String>, Long>> partValueList = suitablePartitions(context, previousTimestamp, statuses); + List<Tuple2<Partition, Long>> partitions = new ArrayList<>(); + for (Tuple2<List<String>, Long> tuple2 : partValueList) { + context.getPartition(tuple2.f0).ifPresent( + partition -> partitions.add(new Tuple2<>(partition, tuple2.f1))); + } + return partitions; + } + + /** + * Find suitable partitions, extract timestamp and compare it with previousTimestamp. + */ + @VisibleForTesting + static List<Tuple2<List<String>, Long>> suitablePartitions( + Context context, + long previousTimestamp, + FileStatus[] statuses) { + List<Tuple2<List<String>, Long>> partValueList = new ArrayList<>(); for (FileStatus status : statuses) { List<String> partValues = extractPartitionValues( new org.apache.flink.core.fs.Path(status.getPath().toString())); long timestamp = context.extractTimestamp( - context.partitionKeys(), partValues, status::getModificationTime); + context.partitionKeys(), + partValues, + // to UTC millisecond. + () -> TimestampData.fromTimestamp( + new Timestamp(status.getModificationTime())).getMillisecond()); if (timestamp >= previousTimestamp) { - context.getPartition(partValues).ifPresent( - partition -> partitions.add(new Tuple2<>(partition, timestamp))); + partValueList.add(new Tuple2<>(partValues, timestamp)); } } - return partitions; + return partValueList; } private static FileStatus[] getFileStatusRecurse(Path path, int expectLevel, FileSystem fs) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java index bc4ff13..843f746 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java @@ -74,18 +74,12 @@ public class HiveMapredSplitReader implements SplitReader { //StructObjectInspector in hive helps us to look into the internal structure of a struct object. private final StructObjectInspector structObjectInspector; - // Remember whether a row instance is reused. No need to set partition fields for reused rows - private boolean rowReused = false; - - //Necessary info to init deserializer - private final List<String> partitionKeys; - private final DataFormatConverters.DataFormatConverter[] converters; - private final HiveTablePartition hiveTablePartition; - private final HiveShim hiveShim; + private final GenericRowData row; + public HiveMapredSplitReader( JobConf jobConf, List<String> partitionKeys, @@ -93,7 +87,7 @@ public class HiveMapredSplitReader implements SplitReader { int[] selectedFields, HiveTableInputSplit split, HiveShim hiveShim) throws IOException { - this.hiveTablePartition = split.getHiveTablePartition(); + HiveTablePartition hiveTablePartition = split.getHiveTablePartition(); StorageDescriptor sd = hiveTablePartition.getStorageDescriptor(); jobConf.set(INPUT_DIR, sd.getLocation()); InputFormat mapredInputFormat; @@ -128,18 +122,30 @@ public class HiveMapredSplitReader implements SplitReader { } this.selectedFields = selectedFields; - this.partitionKeys = partitionKeys; - converters = Arrays.stream(selectedFields) + this.converters = Arrays.stream(selectedFields) .mapToObj(i -> fieldTypes[i]) .map(DataFormatConverters::getConverterForDataType) .toArray(DataFormatConverters.DataFormatConverter[]::new); this.hiveShim = hiveShim; + + // construct reuse row + this.row = new GenericRowData(selectedFields.length); + // set partition columns + if (!partitionKeys.isEmpty()) { + for (int i = 0; i < selectedFields.length; i++) { + if (selectedFields[i] >= structFields.size()) { + String partition = partitionKeys.get(selectedFields[i] - structFields.size()); + row.setField(i, converters[i].toInternal(hiveTablePartition.getPartitionSpec().get(partition))); + } + } + } } @Override public boolean reachedEnd() throws IOException { if (!fetched) { - fetchNext(); + hasNext = recordReader.next(key, value); + fetched = true; } return !hasNext; } @@ -150,8 +156,6 @@ public class HiveMapredSplitReader implements SplitReader { if (reachedEnd()) { return null; } - final GenericRowData row = reuse instanceof GenericRowData ? - (GenericRowData) reuse : new GenericRowData(selectedFields.length); try { //Use HiveDeserializer to deserialize an object out of a Writable blob Object hiveRowStruct = deserializer.deserialize(value); @@ -168,27 +172,10 @@ public class HiveMapredSplitReader implements SplitReader { LOG.error("Error happens when converting hive data type to flink data type."); throw new FlinkHiveException(e); } - if (!rowReused) { - // set partition columns - if (!partitionKeys.isEmpty()) { - for (int i = 0; i < selectedFields.length; i++) { - if (selectedFields[i] >= structFields.size()) { - String partition = partitionKeys.get(selectedFields[i] - structFields.size()); - row.setField(i, converters[i].toInternal(hiveTablePartition.getPartitionSpec().get(partition))); - } - } - } - rowReused = true; - } this.fetched = false; return row; } - private void fetchNext() throws IOException { - hasNext = recordReader.next(key, value); - fetched = true; - } - @Override public void close() throws IOException { if (this.recordReader != null) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java index eb1fb41..29e890b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormat.java @@ -18,6 +18,7 @@ package org.apache.flink.connectors.hive.read; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.configuration.Configuration; @@ -52,16 +53,26 @@ public class HiveTableFileInputFormat extends FileInputFormat<RowData> { @Override public void open(FileInputSplit fileSplit) throws IOException { - URI uri = fileSplit.getPath().toUri(); HiveTableInputSplit split = new HiveTableInputSplit( fileSplit.getSplitNumber(), - new FileSplit(new Path(uri), fileSplit.getStart(), fileSplit.getLength(), (String[]) null), + toHadoopFileSplit(fileSplit), inputFormat.getJobConf(), - hiveTablePartition - ); + hiveTablePartition); inputFormat.open(split); } + @VisibleForTesting + static FileSplit toHadoopFileSplit(FileInputSplit fileSplit) throws IOException { + URI uri = fileSplit.getPath().toUri(); + long length = fileSplit.getLength(); + // Hadoop FileSplit should not have -1 length. + if (length == -1) { + length = fileSplit.getPath().getFileSystem().getFileStatus(fileSplit.getPath()).getLen() - + fileSplit.getStart(); + } + return new FileSplit(new Path(uri), fileSplit.getStart(), length, (String[]) null); + } + @Override public boolean reachedEnd() throws IOException { return inputFormat.reachedEnd(); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index 32a5cd4..5170920 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -473,7 +473,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { " b STRING" + ") PARTITIONED BY (ts STRING) TBLPROPERTIES (" + "'streaming-source.enable'='true'," + - "'streaming-source.monitor-interval'='100ms'," + + "'streaming-source.monitor-interval'='1s'," + "'streaming-source.consume-order'='partition-time'" + ")"); @@ -502,6 +502,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { } HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName) .addRow(new Object[]{i, String.valueOf(i)}) + .addRow(new Object[]{i, i + "_copy"}) .commit("ts='2020-05-06 00:" + i + "0:00'"); } }; @@ -514,10 +515,15 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { List<String> expected = Arrays.asList( "+I(0,0,2020-05-06 00:00:00)", "+I(1,1,2020-05-06 00:10:00)", + "+I(1,1_copy,2020-05-06 00:10:00)", "+I(2,2,2020-05-06 00:20:00)", + "+I(2,2_copy,2020-05-06 00:20:00)", "+I(3,3,2020-05-06 00:30:00)", + "+I(3,3_copy,2020-05-06 00:30:00)", "+I(4,4,2020-05-06 00:40:00)", - "+I(5,5,2020-05-06 00:50:00)" + "+I(4,4_copy,2020-05-06 00:40:00)", + "+I(5,5,2020-05-06 00:50:00)", + "+I(5,5_copy,2020-05-06 00:50:00)" ); List<String> results = sink.getJavaAppendResults(); results.sort(String::compareTo); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java new file mode 100644 index 0000000..ad514d4 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/DirectoryMonitorDiscoveryTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.read; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.TimestampData; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Timestamp; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +/** + * Test for {@link DirectoryMonitorDiscovery}. + */ +public class DirectoryMonitorDiscoveryTest { + + private static FileStatus status(String time) { + return new FileStatus(0L, false, 0, 0L, Timestamp.valueOf(time).getTime(), 0L, null, null, null, new Path("/tmp/dummy")); + } + + @Test + public void testUTC() { + long previousTimestamp = TimestampData.fromTimestamp(Timestamp.valueOf("2020-05-06 12:22:00")).getMillisecond(); + FileStatus[] statuses = new FileStatus[] { + status("2020-05-06 12:20:00"), + status("2020-05-06 12:21:00"), + status("2020-05-06 12:22:00"), + status("2020-05-06 12:23:00"), + status("2020-05-06 12:24:00")}; + List<Tuple2<List<String>, Long>> parts = DirectoryMonitorDiscovery.suitablePartitions( + new PartitionDiscovery.Context() { + + @Override + public List<String> partitionKeys() { + return null; + } + + @Override + public Optional<Partition> getPartition(List<String> partValues) { + return Optional.empty(); + } + + @Override + public FileSystem fileSystem() { + return null; + } + + @Override + public Path tableLocation() { + return null; + } + + @Override + public long extractTimestamp(List<String> partKeys, List<String> partValues, + Supplier<Long> fileTime) { + return fileTime.get(); + } + }, + previousTimestamp, + statuses); + Assert.assertEquals(3, parts.size()); + Assert.assertEquals("2020-05-06T12:22", TimestampData.fromEpochMillis(parts.get(0).f1).toString()); + Assert.assertEquals("2020-05-06T12:23", TimestampData.fromEpochMillis(parts.get(1).f1).toString()); + Assert.assertEquals("2020-05-06T12:24", TimestampData.fromEpochMillis(parts.get(2).f1).toString()); + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java new file mode 100644 index 0000000..9a4fb37 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveTableFileInputFormatTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.read; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.FileUtils; + +import org.apache.hadoop.mapred.FileSplit; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +/** + * Test for {@link HiveTableFileInputFormat}. + */ +public class HiveTableFileInputFormatTest { + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Test + public void testSplit() throws IOException { + File file = TEMPORARY_FOLDER.newFile(); + FileUtils.writeFileUtf8(file, "hahahahahahaha"); + FileInputSplit split = new FileInputSplit(0, new Path(file.getPath()), 0, -1, null); + FileSplit fileSplit = HiveTableFileInputFormat.toHadoopFileSplit(split); + Assert.assertEquals(14, fileSplit.getLength()); + } +}