kgyrtkirk commented on a change in pull request #1724:
URL: https://github.com/apache/hive/pull/1724#discussion_r557422893
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
##########
@@ -774,6 +780,100 @@ private void checkFileFormats(Hive db, LoadTableDesc tbd,
Table table)
}
}
+ class LocalTableLock implements Closeable{
+
+ private Optional<HiveLockObject> lock;
+ private HiveLock lockObj;
+
+ public LocalTableLock(Optional<HiveLockObject> lock) throws LockException {
+
+ this.lock = lock;
+ if(!lock.isPresent()) {
+ return;
+ }
+ LOG.info("LocalTableLock; locking: " + lock);
+ HiveLockManager lockMgr = context.getHiveTxnManager().getLockManager();
+ lockObj = lockMgr.lock(lock.get(), HiveLockMode.SEMI_SHARED, true);
+ LOG.info("LocalTableLock; locked: " + lock);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(!lock.isPresent()) {
+ return;
+ }
+ LOG.info("LocalTableLock; unlocking: "+lock);
+ HiveLockManager lockMgr;
+ try {
+ lockMgr = context.getHiveTxnManager().getLockManager();
+ lockMgr.unlock(lockObj);
+ } catch (LockException e1) {
+ throw new IOException(e1);
+ }
+ LOG.info("LocalTableLock; unlocked");
+ }
+
+ }
+
+ static enum LockFileMoveMode {
+ none, dp, all;
+
+ public static LockFileMoveMode fromConf(HiveConf conf) {
+ if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) {
+ return none;
+ }
+ String lockFileMoveMode =
conf.getVar(HiveConf.ConfVars.HIVE_LOCK_FILE_MOVE_MODE);
+ return valueOf(lockFileMoveMode);
+ }
+ }
+
+ private LocalTableLock acquireLockForFileMove(LoadTableDesc loadTableWork)
throws HiveException {
+ LockFileMoveMode mode = LockFileMoveMode.fromConf(conf);
+
+ if (mode == LockFileMoveMode.none) {
+ return new LocalTableLock(Optional.empty());
+ }
+ if (mode == LockFileMoveMode.dp) {
+ if (loadTableWork.getDPCtx() == null) {
+ return new LocalTableLock(Optional.empty());
+ }
+ }
+
+ WriteEntity output = context.getLoadTableOutputMap().get(loadTableWork);
+ List<HiveLockObj> lockObjects = context.getOutputLockObjects().get(output);
+ if (lockObjects == null) {
+ return new LocalTableLock(Optional.empty());
+ }
+ TableDesc table = loadTableWork.getTable();
+ if(table == null) {
+ return new LocalTableLock(Optional.empty());
+ }
+
+ Hive db = getHive();
+ Table baseTable = db.getTable(loadTableWork.getTable().getTableName());
+
+ HiveLockObject.HiveLockObjectData lockData =
+ new HiveLockObject.HiveLockObjectData(queryPlan.getQueryId(),
+ String.valueOf(System.currentTimeMillis()),
+ "IMPLICIT",
+ queryPlan.getQueryStr(),
+ conf);
+
+ HiveLockObject lock = new HiveLockObject(baseTable,lockData);
+
+ for (HiveLockObj hiveLockObj : lockObjects) {
Review comment:
I tried to use stream api a few times - and I'm a bit against it because:
* it's somewhat unreadable
* it's harder to read them when they eventually changed in a patch
* not really debug friendly
* they seem to fit "easy tasks" nicely; but if someone extends it later - it
might become a really hard to read expression...
instead of 2 complex lines; this is 7 simple - I would rather keep it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]