This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0e17cd4d92 [fix](hudi) use hudi api to split the COW table (#21385)
0e17cd4d92 is described below
commit 0e17cd4d9203ee1621baa5e75eb313a56d7e455d
Author: Ashin Gau <[email protected]>
AuthorDate: Sat Jul 1 08:35:33 2023 +0800
[fix](hudi) use hudi api to split the COW table (#21385)
Fix tow bugs:
COW & Read Optimized table will use hive splitter to split files, but it
can't recognize some specific files.
ERROR 1105 (HY000): errCode = 2, detailMessage =
(172.21.0.101)[CORRUPTION]Invalid magic number in parquet file, bytes read:
3035, file size: 3035,
path:
/usr/hive/warehouse/hudi.db/test/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight,
read magic:
The read optimized table created by spark will add empty partition even if
the table has no partition, so we have to filter these empty partition keys in
hive client.
| test_ro | CREATE TABLE `test_ro`(
`_hoodie_commit_time` string COMMENT '',
...
`ts` bigint COMMENT '')
PARTITIONED BY (
`` string)
ROW FORMAT SERDE
---
.../doris/planner/external/HiveScanNode.java | 5 +-
.../doris/planner/external/hudi/HudiScanNode.java | 71 +++++++++++-----------
2 files changed, 38 insertions(+), 38 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index b8ac376307..738a2e3933 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -215,8 +215,9 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
public List<String> getPathPartitionKeys() {
- return hmsTable.getRemoteTable().getPartitionKeys()
-
.stream().map(FieldSchema::getName).map(String::toLowerCase).collect(Collectors.toList());
+ return hmsTable.getRemoteTable().getPartitionKeys().stream()
+ .map(FieldSchema::getName).filter(partitionKey ->
!"".equals(partitionKey))
+ .map(String::toLowerCase).collect(Collectors.toList());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index fdb50e78a7..3c4fb0d1fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -56,7 +55,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -141,15 +139,6 @@ public class HudiScanNode extends HiveScanNode {
@Override
public List<Split> getSplits() throws UserException {
- if (isCowTable) {
- // skip hidden files start with "."
- List<Split> cowSplits = super.getSplits().stream()
- .filter(split -> !((FileSplit)
split).getPath().getName().startsWith("."))
- .collect(Collectors.toList());
- noLogsSplitNum = cowSplits.size();
- return cowSplits;
- }
-
HoodieTableMetaClient hudiClient =
HiveMetaStoreClientHelper.getHudiClient(hmsTable);
hudiClient.reloadActiveTimeline();
String basePath = hmsTable.getRemoteTable().getSd().getLocation();
@@ -207,32 +196,42 @@ public class HudiScanNode extends HiveScanNode {
HoodieTableFileSystemView fileSystemView = new
HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));
- Iterator<FileSlice> hoodieFileSliceIterator = fileSystemView
- .getLatestMergedFileSlicesBeforeOrOn(partitionName,
queryInstant).iterator();
- while (hoodieFileSliceIterator.hasNext()) {
- FileSlice fileSlice = hoodieFileSliceIterator.next();
- Optional<HoodieBaseFile> baseFile =
fileSlice.getBaseFile().toJavaOptional();
- String filePath =
baseFile.map(BaseFile::getPath).orElse("");
- long fileSize =
baseFile.map(BaseFile::getFileSize).orElse(0L);
-
- List<String> logs =
fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString)
- .collect(Collectors.toList());
- if (logs.isEmpty()) {
+ if (isCowTable) {
+ fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName,
queryInstant).forEach(baseFile -> {
noLogsSplitNum++;
- }
-
- HudiSplit split = new HudiSplit(new Path(filePath), 0,
fileSize, fileSize, new String[0],
- partition.getPartitionValues());
- split.setTableFormatType(TableFormatType.HUDI);
- split.setDataFilePath(filePath);
- split.setHudiDeltaLogs(logs);
- split.setInputFormat(inputFormat);
- split.setSerde(serdeLib);
- split.setBasePath(basePath);
- split.setHudiColumnNames(columnNames);
- split.setHudiColumnTypes(columnTypes);
- split.setInstantTime(queryInstant);
- splits.add(split);
+ String filePath = baseFile.getPath();
+ long fileSize = baseFile.getFileSize();
+ FileSplit split = new FileSplit(new Path(filePath), 0,
fileSize, fileSize, new String[0],
+ partition.getPartitionValues());
+ splits.add(split);
+ });
+ } else {
+
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
+ .forEach(fileSlice -> {
+ Optional<HoodieBaseFile> baseFile =
fileSlice.getBaseFile().toJavaOptional();
+ String filePath =
baseFile.map(BaseFile::getPath).orElse("");
+ long fileSize =
baseFile.map(BaseFile::getFileSize).orElse(0L);
+
+ List<String> logs =
fileSlice.getLogFiles().map(HoodieLogFile::getPath)
+ .map(Path::toString)
+ .collect(Collectors.toList());
+ if (logs.isEmpty()) {
+ noLogsSplitNum++;
+ }
+
+ HudiSplit split = new HudiSplit(new
Path(filePath), 0, fileSize, fileSize,
+ new String[0],
partition.getPartitionValues());
+ split.setTableFormatType(TableFormatType.HUDI);
+ split.setDataFilePath(filePath);
+ split.setHudiDeltaLogs(logs);
+ split.setInputFormat(inputFormat);
+ split.setSerde(serdeLib);
+ split.setBasePath(basePath);
+ split.setHudiColumnNames(columnNames);
+ split.setHudiColumnTypes(columnTypes);
+ split.setInstantTime(queryInstant);
+ splits.add(split);
+ });
}
}
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]