morningman commented on code in PR #44001:
URL: https://github.com/apache/doris/pull/44001#discussion_r1888191170
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -742,121 +738,37 @@ public LoadingCache<PartitionCacheKey, HivePartition>
getPartitionCache() {
return partitionCache;
}
- public List<FileCacheValue> getFilesByTransaction(List<HivePartition>
partitions, ValidWriteIdList validWriteIds,
- boolean isFullAcid, boolean skipCheckingAcidVersionFile, long
tableId, String bindBrokerName) {
+ public List<FileCacheValue> getFilesByTransaction(List<HivePartition>
partitions, Map<String, String> txnValidIds,
+ boolean isFullAcid, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
+
try {
+ if (partitions.isEmpty()) {
+ return fileCacheValues;
+ }
+
for (HivePartition partition : partitions) {
- FileCacheValue fileCacheValue = new FileCacheValue();
- AcidUtils.Directory directory;
+ //Get filesystem multiple times, reason:
https://github.com/apache/doris/pull/23409.
+ RemoteFileSystem fileSystem =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
+ new FileSystemCache.FileSystemCacheKey(
+
LocationPath.getFSIdentity(partition.getPath(), bindBrokerName),
+ catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
+
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(remoteUser);
- directory =
ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () ->
AcidUtils.getAcidState(
- new Path(partition.getPath()), jobConf,
validWriteIds, false, true));
+ fileCacheValues.add(
+
ugi.doAs((PrivilegedExceptionAction<FileCacheValue>) () ->
AcidUtil.getAcidState(
+ fileSystem, partition, txnValidIds,
catalog.getProperties()))
+ );
} else {
- directory = AcidUtils.getAcidState(new
Path(partition.getPath()), jobConf, validWriteIds, false,
- true);
- }
- if (directory == null) {
- return Collections.emptyList();
- }
- if (!directory.getOriginalFiles().isEmpty()) {
- throw new Exception("Original non-ACID files in
transactional tables are not supported");
- }
-
- if (isFullAcid) {
- int acidVersion = 2;
- /**
- * From Hive version >= 3.0, delta/base files will always
have file '_orc_acid_version'
- * with value >= '2'.
- */
- Path baseOrDeltaPath = directory.getBaseDirectory() !=
null ? directory.getBaseDirectory() :
- !directory.getCurrentDirectories().isEmpty() ?
directory.getCurrentDirectories().get(0)
- .getPath() : null;
- if (baseOrDeltaPath == null) {
- return Collections.emptyList();
- }
- if (!skipCheckingAcidVersionFile) {
- String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
- RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new FileSystemCache.FileSystemCacheKey(
-
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
- bindBrokerName),
-
catalog.getCatalogProperty().getProperties(),
- bindBrokerName, jobConf));
- Status status = fs.exists(acidVersionPath);
- if (status != Status.OK) {
- if (status.getErrCode() == ErrCode.NOT_FOUND) {
- acidVersion = 0;
- } else {
- throw new Exception(String.format("Failed to
check remote path {} exists.",
- acidVersionPath));
- }
- }
- if (acidVersion == 0 &&
!directory.getCurrentDirectories().isEmpty()) {
- throw new Exception(
- "Hive 2.x versioned full-acid tables need
to run major compaction.");
- }
- }
- }
-
- // delta directories
- List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
- for (AcidUtils.ParsedDelta delta :
directory.getCurrentDirectories()) {
- String location = delta.getPath().toString();
- RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new FileSystemCache.FileSystemCacheKey(
- LocationPath.getFSIdentity(location,
bindBrokerName),
-
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
- List<RemoteFile> remoteFiles = new ArrayList<>();
- Status status = fs.listFiles(location, false, remoteFiles);
- if (status.ok()) {
- if (delta.isDeleteDelta()) {
- List<String> deleteDeltaFileNames =
remoteFiles.stream().map(f -> f.getName()).filter(
- name ->
name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
- .collect(Collectors.toList());
- deleteDeltas.add(new DeleteDeltaInfo(location,
deleteDeltaFileNames));
- continue;
- }
- remoteFiles.stream().filter(
- f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
- LocationPath path = new
LocationPath(file.getPath().toString(),
- catalog.getProperties());
- fileCacheValue.addFile(file, path);
- });
- } else {
- throw new RuntimeException(status.getErrMsg());
- }
- }
-
- // base
- if (directory.getBaseDirectory() != null) {
- String location = directory.getBaseDirectory().toString();
- RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new FileSystemCache.FileSystemCacheKey(
- LocationPath.getFSIdentity(location,
bindBrokerName),
-
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
- List<RemoteFile> remoteFiles = new ArrayList<>();
- Status status = fs.listFiles(location, false, remoteFiles);
- if (status.ok()) {
- remoteFiles.stream().filter(
- f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
- .forEach(file -> {
- LocationPath path = new
LocationPath(file.getPath().toString(),
- catalog.getProperties());
- fileCacheValue.addFile(file, path);
- });
- } else {
- throw new RuntimeException(status.getErrMsg());
- }
+ fileCacheValues.add(AcidUtil.getAcidState(
Review Comment:
also need ugi.doAs
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java:
##########
@@ -267,7 +266,14 @@ private void getFileSplitByPartitions(HiveMetaStoreCache
cache, List<HivePartiti
List<Split> allFiles, String bindBrokerName, int numBackends)
throws IOException, UserException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
- fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
+ try {
+ fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
+ } catch (Exception e) {
+ // Release shared load (getValidWriteIds acquire Lock).
+ // If no exception is throw, the lock will be released when
`finalizeQuery()`.
+
Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());
Review Comment:
It is not a good design to `deregister` transaction here.
Looks like it is very error prone
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]