[ 
https://issues.apache.org/jira/browse/HIVE-23789?focusedWorklogId=453281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-453281
 ]

ASF GitHub Bot logged work on HIVE-23789:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jul/20 07:54
            Start Date: 01/Jul/20 07:54
    Worklog Time Spent: 10m 
      Work Description: pvary commented on a change in pull request #1194:
URL: https://github.com/apache/hive/pull/1194#discussion_r448183060



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
##########
@@ -288,15 +313,231 @@ private void acquireLocksInternal() throws 
CommandProcessorException, LockExcept
     }
   }
 
-  public void addHiveLocksFromContext() {
+  /**
+   *  Write the current set of valid write ids for the operated acid tables 
into the configuration so
+   *  that it can be read by the input format.
+   */
+  private ValidTxnWriteIdList recordValidWriteIds() throws LockException {
+    String txnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+    if (Strings.isNullOrEmpty(txnString)) {
+      throw new IllegalStateException("calling recordValidWritsIdss() without 
initializing ValidTxnList " +
+          
JavaUtils.txnIdToString(driverContext.getTxnManager().getCurrentTxnId()));
+    }
+
+    ValidTxnWriteIdList txnWriteIds = getTxnWriteIds(txnString);
+    setValidWriteIds(txnWriteIds);
+
+    LOG.debug("Encoding valid txn write ids info {} txnid: {}", 
txnWriteIds.toString(),
+        driverContext.getTxnManager().getCurrentTxnId());
+    return txnWriteIds;
+  }
+
+  private ValidTxnWriteIdList getTxnWriteIds(String txnString) throws 
LockException {
+    List<String> txnTables = getTransactionalTables(getTables(true, true));
+    ValidTxnWriteIdList txnWriteIds = null;
+    if (driverContext.getCompactionWriteIds() != null) {
+      // This is kludgy: here we need to read with Compactor's snapshot/txn 
rather than the snapshot of the current
+      // {@code txnMgr}, in effect simulating a "flashback query" but can't 
actually share compactor's txn since it
+      // would run multiple statements.  See more comments in {@link 
org.apache.hadoop.hive.ql.txn.compactor.Worker}
+      // where it start the compactor txn*/
+      if (txnTables.size() != 1) {
+        throw new LockException("Unexpected tables in compaction: " + 
txnTables);
+      }
+      txnWriteIds = new ValidTxnWriteIdList(driverContext.getCompactorTxnId());
+      
txnWriteIds.addTableValidWriteIdList(driverContext.getCompactionWriteIds());
+    } else {
+      txnWriteIds = driverContext.getTxnManager().getValidWriteIds(txnTables, 
txnString);
+    }
+    if (driverContext.getTxnType() == TxnType.READ_ONLY && !getTables(false, 
true).isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Inferred transaction type '%s' doesn't conform to the actual query 
string '%s'",
+          driverContext.getTxnType(), 
driverContext.getQueryState().getQueryString()));
+    }
+    return txnWriteIds;
+  }
+
+  private void setValidWriteIds(ValidTxnWriteIdList txnWriteIds) {
+    driverContext.getConf().set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, 
txnWriteIds.toString());
+    if (driverContext.getPlan().getFetchTask() != null) {
+      // This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} 
optimization which initializes JobConf
+      // in FetchOperator before recordValidTxns() but this has to be done 
after locks are acquired to avoid race
+      // conditions in ACID. This case is supported only for single source 
query.
+      Operator<?> source = 
driverContext.getPlan().getFetchTask().getWork().getSource();
+      if (source instanceof TableScanOperator) {
+        TableScanOperator tsOp = (TableScanOperator)source;
+        String fullTableName = 
AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(),
+            tsOp.getConf().getTableName());
+        ValidWriteIdList writeIdList = 
txnWriteIds.getTableValidWriteIdList(fullTableName);
+        if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) {
+          throw new IllegalStateException(String.format(
+              "ACID table: %s is missing from the ValidWriteIdList config: 
%s", fullTableName, txnWriteIds.toString()));
+        }
+        if (writeIdList != null) {
+          
driverContext.getPlan().getFetchTask().setValidWriteIdList(writeIdList.toString());
+        }
+      }
+    }
+  }
+
+  /**
+   * Checks whether txn list has been invalidated while planning the query.
+   * This would happen if query requires exclusive/semi-shared lock, and there 
has been a committed transaction
+   * on the table over which the lock is required.
+   */
+  boolean isValidTxnListState() throws LockException {
+    // 1) Get valid txn list.
+    String txnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+    if (txnString == null) {
+      return true; // Not a transactional op, nothing more to do
+    }
+
+    // 2) Get locks that are relevant:
+    // - Exclusive for INSERT OVERWRITE, when shared write is disabled 
(HiveConf.TXN_WRITE_X_LOCK=false).
+    // - Excl-write for UPDATE/DELETE, when shared write is disabled, INSERT 
OVERWRITE - when enabled.
+    Set<String> nonSharedLockedTables = getNonSharedLockedTables();
+    if (nonSharedLockedTables.isEmpty()) {
+      return true; // Nothing to check
+    }
+
+    // 3) Get txn tables that are being written
+    String txnWriteIdListString = 
driverContext.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+    if (Strings.isNullOrEmpty(txnWriteIdListString)) {
+      return true; // Nothing to check
+    }
+
+    GetOpenTxnsResponse openTxns = driverContext.getTxnManager().getOpenTxns();
+    ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(openTxns, 0);
+    long txnId = driverContext.getTxnManager().getCurrentTxnId();
+
+    String currentTxnString;
+    if (validTxnList.isTxnRangeValid(txnId + 1, 
openTxns.getTxn_high_water_mark()) != ValidTxnList.RangeResponse.NONE) {
+      // If here, there was another txn opened & committed between current 
SNAPSHOT generation and locking.
+      validTxnList.removeException(txnId);
+      currentTxnString = validTxnList.toString();
+    } else {
+      currentTxnString = TxnCommonUtils.createValidReadTxnList(openTxns, 
txnId).toString();
+    }
+
+    if (currentTxnString.equals(txnString)) {
+      return true; // Still valid, nothing more to do
+    }
+    return checkWriteIds(currentTxnString, nonSharedLockedTables, 
txnWriteIdListString);
+  }
+
+  private Set<String> getNonSharedLockedTables() {
+    if (CollectionUtils.isEmpty(context.getHiveLocks())) {
+      return Collections.emptySet(); // Nothing to check
+    }
+
+    Set<String> nonSharedLockedTables = new HashSet<>();
+    for (HiveLock lock : context.getHiveLocks()) {
+      if (lock.mayContainComponents()) {
+        // The lock may have multiple components, e.g., DbHiveLock, hence we 
need to check for each of them
+        for (LockComponent lockComponent : lock.getHiveLockComponents()) {
+          // We only consider tables for which we hold either an exclusive or 
a excl-write lock
+          if ((lockComponent.getType() == LockType.EXCLUSIVE || 
lockComponent.getType() == LockType.EXCL_WRITE) &&
+              lockComponent.getTablename() != null && 
!DbTxnManager.GLOBAL_LOCKS.equals(lockComponent.getDbname())) {
+            
nonSharedLockedTables.add(TableName.getDbTable(lockComponent.getDbname(), 
lockComponent.getTablename()));
+          }
+        }
+      } else {
+        // The lock has a single components, e.g., SimpleHiveLock or 
ZooKeeperHiveLock.
+        // Pos 0 of lock paths array contains dbname, pos 1 contains tblname
+        if ((lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || 
lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) &&
+            lock.getHiveLockObject().getPaths().length == 2) {
+          nonSharedLockedTables.add(
+              TableName.getDbTable(lock.getHiveLockObject().getPaths()[0], 
lock.getHiveLockObject().getPaths()[1]));
+        }
+      }
+    }
+    return nonSharedLockedTables;
+  }
+
+  private boolean checkWriteIds(String currentTxnString, Set<String> 
nonSharedLockedTables, String txnWriteIdListString)
+      throws LockException {
+    ValidTxnWriteIdList txnWriteIdList = new 
ValidTxnWriteIdList(txnWriteIdListString);
+    Map<String, Table> writtenTables = getTables(false, true);
+
+    ValidTxnWriteIdList currentTxnWriteIds = 
driverContext.getTxnManager().getValidWriteIds(
+        getTransactionalTables(writtenTables), currentTxnString);
+
+    for (Map.Entry<String, Table> tableInfo : writtenTables.entrySet()) {
+      String fullQNameForLock = 
TableName.getDbTable(tableInfo.getValue().getDbName(),
+          MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName()));
+      if (nonSharedLockedTables.contains(fullQNameForLock)) {
+        // Check if table is transactional
+        if (AcidUtils.isTransactionalTable(tableInfo.getValue())) {
+          ValidWriteIdList writeIdList = 
txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey());
+          ValidWriteIdList currentWriteIdList = 
currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey());
+          // Check if there was a conflicting write between current SNAPSHOT 
generation and locking.
+          if 
(currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1,
+              currentWriteIdList.getHighWatermark()) != 
ValidWriteIdList.RangeResponse.NONE) {
+            return false;
+          }
+          // Check that write id is still valid
+          if (!TxnIdUtils.checkEquivalentWriteIds(writeIdList, 
currentWriteIdList)) {
+            // Write id has changed, it is not valid anymore, we need to 
recompile
+            return false;
+          }
+        }
+        nonSharedLockedTables.remove(fullQNameForLock);
+      }
+    }
+
+    if (!nonSharedLockedTables.isEmpty()) {
+      throw new LockException("Wrong state: non-shared locks contain 
information for tables that have not" +
+          " been visited when trying to validate the locks from query 
tables.\n" +
+          "Tables: " + writtenTables.keySet() + "\n" +
+          "Remaining locks after check: " + nonSharedLockedTables);
+    }
+
+    return true; // It passes the test, it is valid
+  }
+
+  private Map<String, Table> getTables(boolean inputNeeded, boolean 
outputNeeded) {
+    Map<String, Table> tables = new HashMap<>();
+    if (inputNeeded) {
+      driverContext.getPlan().getInputs().forEach(input -> 
addTableFromEntity(input, tables));
+    }
+    if (outputNeeded) {
+      driverContext.getPlan().getOutputs().forEach(output -> 
addTableFromEntity(output, tables));
+    }
+    return tables;
+  }
+
+  private void addTableFromEntity(Entity entity, Map<String, Table> tables) {
+    Table table;
+    switch (entity.getType()) {
+    case TABLE:
+      table = entity.getTable();
+      break;
+    case PARTITION:
+    case DUMMYPARTITION:
+      table = entity.getPartition().getTable();
+      break;
+    default:
+      return;
+    }
+    String fullTableName = AcidUtils.getFullTableName(table.getDbName(), 
table.getTableName());
+    tables.put(fullTableName, table);
+  }
+
+  private List<String> getTransactionalTables(Map<String, Table> tables) {
+    return tables.entrySet().stream()
+      .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue()))
+      .map(Map.Entry::getKey)
+      .collect(Collectors.toList());
+  }
+
+  void addHiveLocksFromContext() {

Review comment:
       I do not like storing lock related stuff in context. Shouldn't this be 
private to the DbTxnHandler?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 453281)
    Time Spent: 20m  (was: 10m)

> Merge ValidTxnManager into DriverTxnHandler
> -------------------------------------------
>
>                 Key: HIVE-23789
>                 URL: https://issues.apache.org/jira/browse/HIVE-23789
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Hive
>            Reporter: Miklos Gergely
>            Assignee: Miklos Gergely
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to