http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index a90b7d4..ba006cf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -286,8 +286,9 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -302,7 +303,7 @@ class CompactionTxnHandler extends TxnHandler { default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } info.runAs = rs.getString(6); - info.highestTxnId = rs.getLong(7); + info.highestWriteId = rs.getLong(7); rc.add(info); } LOG.debug("Going to rollback"); @@ -338,7 +339,7 @@ class CompactionTxnHandler extends TxnHandler { ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, info.id); rs = pStmt.executeQuery(); if(rs.next()) { @@ -358,21 +359,21 @@ class CompactionTxnHandler extends TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); // Remove entries from completed_txn_components as well, so we don't start looking there - // again but only up to the highest txn ID include in this compaction job. - //highestTxnId will be NULL in upgrade scenarios + // again but only up to the highest write ID include in this compaction job. + //highestWriteId will be NULL in upgrade scenarios s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + "ctc_table = ?"; if (info.partName != null) { s += " and ctc_partition = ?"; } - if(info.highestTxnId != 0) { - s += " and ctc_txnid <= ?"; + if(info.highestWriteId != 0) { + s += " and ctc_writeid <= ?"; } pStmt = dbConn.prepareStatement(s); int paramCount = 1; @@ -381,8 +382,8 @@ class CompactionTxnHandler extends TxnHandler { if (info.partName != null) { pStmt.setString(paramCount++, info.partName); } - if(info.highestTxnId != 0) { - pStmt.setLong(paramCount++, info.highestTxnId); + if(info.highestWriteId != 0) { + pStmt.setLong(paramCount++, info.highestWriteId); } LOG.debug("Going to execute update <" + s + ">"); if (pStmt.executeUpdate() < 1) { @@ -392,15 +393,15 @@ class CompactionTxnHandler extends TxnHandler { s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; - if (info.highestTxnId != 0) s += " and txn_id <= ?"; + if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; if (info.partName != null) s += " and tc_partition = ?"; pStmt = dbConn.prepareStatement(s); paramCount = 1; pStmt.setString(paramCount++, info.dbname); pStmt.setString(paramCount++, info.tableName); - if(info.highestTxnId != 0) { - pStmt.setLong(paramCount++, info.highestTxnId); + if(info.highestWriteId != 0) { + pStmt.setLong(paramCount++, info.highestWriteId); } if (info.partName != null) { pStmt.setString(paramCount++, info.partName); @@ -700,14 +701,14 @@ class CompactionTxnHandler extends TxnHandler { */ @Override @RetrySemantics.Idempotent - public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { + public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException { Connection dbConn = null; Statement stmt = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_TXN_ID = " + highestTxnId + + int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId + " WHERE CQ_ID = " + ci.id); if(updCount != 1) { throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); @@ -715,14 +716,14 @@ class CompactionTxnHandler extends TxnHandler { dbConn.commit(); } catch (SQLException e) { rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + highestTxnId + ")"); + checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { close(null, stmt, dbConn); } } catch (RetryException ex) { - setCompactionHighestTxnId(ci, highestTxnId); + setCompactionHighestWriteId(ci, highestWriteId); } } private static class RetentionCounters { @@ -932,7 +933,7 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, ci.id); rs = pStmt.executeQuery(); if(rs.next()) { @@ -966,7 +967,7 @@ class CompactionTxnHandler extends TxnHandler { close(rs, stmt, null); closeStmt(pStmt); - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); int updCount = pStmt.executeUpdate(); LOG.debug("Going to commit");
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index e724723..88f6346 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -86,16 +86,29 @@ public final class TxnDbUtil { " TC_DATABASE varchar(128) NOT NULL," + " TC_TABLE varchar(128)," + " TC_PARTITION varchar(767)," + - " TC_OPERATION_TYPE char(1) NOT NULL)"); + " TC_OPERATION_TYPE char(1) NOT NULL," + + " TC_WRITEID bigint)"); stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + " CTC_TXNID bigint," + " CTC_DATABASE varchar(128) NOT NULL," + " CTC_TABLE varchar(128)," + " CTC_PARTITION varchar(767)," + " CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," + - " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)"); + " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," + + " CTC_WRITEID bigint)"); stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); + + stmt.execute("CREATE TABLE TXN_TO_WRITE_ID (" + + " T2W_TXNID bigint NOT NULL," + + " T2W_DATABASE varchar(128) NOT NULL," + + " T2W_TABLE varchar(256) NOT NULL," + + " T2W_WRITEID bigint NOT NULL)"); + stmt.execute("CREATE TABLE NEXT_WRITE_ID (" + + " NWI_DATABASE varchar(128) NOT NULL," + + " NWI_TABLE varchar(256) NOT NULL," + + " NWI_NEXT bigint NOT NULL)"); + stmt.execute("CREATE TABLE HIVE_LOCKS (" + " HL_LOCK_EXT_ID bigint NOT NULL," + " HL_LOCK_INT_ID bigint NOT NULL," + @@ -130,7 +143,7 @@ public final class TxnDbUtil { " CQ_WORKER_ID varchar(128)," + " CQ_START bigint," + " CQ_RUN_AS varchar(128)," + - " CQ_HIGHEST_TXN_ID bigint," + + " CQ_HIGHEST_WRITE_ID bigint," + " CQ_META_INFO varchar(2048) for bit data," + " CQ_HADOOP_JOB_ID varchar(32))"); @@ -138,20 +151,20 @@ public final class TxnDbUtil { stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" + - " CC_ID bigint PRIMARY KEY," + - " CC_DATABASE varchar(128) NOT NULL," + - " CC_TABLE varchar(128) NOT NULL," + - " CC_PARTITION varchar(767)," + - " CC_STATE char(1) NOT NULL," + - " CC_TYPE char(1) NOT NULL," + - " CC_TBLPROPERTIES varchar(2048)," + - " CC_WORKER_ID varchar(128)," + - " CC_START bigint," + - " CC_END bigint," + - " CC_RUN_AS varchar(128)," + - " CC_HIGHEST_TXN_ID bigint," + - " CC_META_INFO varchar(2048) for bit data," + - " CC_HADOOP_JOB_ID varchar(32))"); + " CC_ID bigint PRIMARY KEY," + + " CC_DATABASE varchar(128) NOT NULL," + + " CC_TABLE varchar(128) NOT NULL," + + " CC_PARTITION varchar(767)," + + " CC_STATE char(1) NOT NULL," + + " CC_TYPE char(1) NOT NULL," + + " CC_TBLPROPERTIES varchar(2048)," + + " CC_WORKER_ID varchar(128)," + + " CC_START bigint," + + " CC_END bigint," + + " CC_RUN_AS varchar(128)," + + " CC_HIGHEST_WRITE_ID bigint," + + " CC_META_INFO varchar(2048) for bit data," + + " CC_HADOOP_JOB_ID varchar(32))"); stmt.execute("CREATE TABLE AUX_TABLE (" + " MT_KEY1 varchar(128) NOT NULL," + @@ -219,6 +232,8 @@ public final class TxnDbUtil { success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount); success &= dropTable(stmt, "TXNS", retryCount); success &= dropTable(stmt, "NEXT_TXN_ID", retryCount); + success &= dropTable(stmt, "TXN_TO_WRITE_ID", retryCount); + success &= dropTable(stmt, "NEXT_WRITE_ID", retryCount); success &= dropTable(stmt, "HIVE_LOCKS", retryCount); success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount); success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 1bb976c..ac61715 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -61,6 +61,7 @@ import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; @@ -69,6 +70,8 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; @@ -80,6 +83,8 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; @@ -102,10 +107,12 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -819,8 +826,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + - "ctc_table, ctc_partition) select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " + + "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); int modCount = 0; if ((modCount = stmt.executeUpdate(s)) < 1) { @@ -869,6 +876,244 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } @Override + @RetrySemantics.ReadOnly + public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) + throws NoSuchTxnException, MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + ValidTxnList validTxnList; + + // We should prepare the valid write ids list based on validTxnList of current txn. + // If no txn exists in the caller, then they would pass null for validTxnList and so it is + // required to get the current state of txns to make validTxnList + if (rqst.isSetValidTxnList()) { + validTxnList = new ValidReadTxnList(rqst.getValidTxnList()); + } else { + // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn + validTxnList = TxnUtils.createValidReadTxnList(getOpenTxns(), 0); + } + try { + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // Get the valid write id list for all the tables read by the current txn + List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>(); + for (String fullTableName : rqst.getFullTableNames()) { + tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList)); + } + + LOG.debug("Going to rollback"); + dbConn.rollback(); + GetValidWriteIdsResponse owr = new GetValidWriteIdsResponse(tblValidWriteIdsList); + return owr; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getValidWriteIds"); + throw new MetaException("Unable to select from transaction database, " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException e) { + return getValidWriteIds(rqst); + } + } + + // Method to get the Valid write ids list for the given table + // Input fullTableName is expected to be of format <db_name>.<table_name> + private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName, + ValidTxnList validTxnList) throws SQLException { + ResultSet rs = null; + String[] names = TxnUtils.getDbTableName(fullTableName); + try { + // Need to initialize to 0 to make sure if nobody modified this table, then current txn + // shouldn't read any data + long writeIdHwm = 0; + List<Long> invalidWriteIdList = new ArrayList<>(); + long txnHwm = validTxnList.getHighWatermark(); + + // The output includes all the txns which are under the high water mark. It includes + // the committed transactions as well. The results should be sorted in ascending order based + // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up + // using binary search. + String s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]) + + " order by t2w_writeid asc"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + long minOpenWriteId = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); + while (rs.next()) { + long txnId = rs.getLong(1); + long writeId = rs.getLong(2); + writeIdHwm = Math.max(writeIdHwm, writeId); + if (validTxnList.isTxnValid(txnId)) { + // Skip if the transaction under evaluation is already committed. + continue; + } + + // The current txn is either in open or aborted state. + // Mark the write ids state as per the txn state. + if (validTxnList.isTxnAborted(txnId)) { + invalidWriteIdList.add(writeId); + abortedBits.set(invalidWriteIdList.size() - 1); + } else { + invalidWriteIdList.add(writeId); + minOpenWriteId = Math.min(minOpenWriteId, writeId); + } + } + + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + TableValidWriteIds owi = new TableValidWriteIds(fullTableName, writeIdHwm, invalidWriteIdList, byteBuffer); + if (minOpenWriteId < Long.MAX_VALUE) { + owi.setMinOpenWriteId(minOpenWriteId); + } + return owi; + } finally { + close(rs); + } + } + + @Override + public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + List<Long> txnIds = rqst.getTxnIds(); + String dbName = rqst.getDbName().toLowerCase(); + String tblName = rqst.getTableName().toLowerCase(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + TxnStore.MutexAPI.LockHandle handle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + Collections.sort(txnIds); //easier to read logs + + // Check if all the input txns are in open state. Write ID should be allocated only for open transactions. + if (!isTxnsInOpenState(txnIds, stmt)) { + ensureAllTxnsValid(dbName, tblName, txnIds, stmt); + throw new RuntimeException("This should never happen for txnIds: " + txnIds); + } + + List<TxnToWriteId> txnToWriteIds = new ArrayList<>(); + List<Long> allocatedTxns = new ArrayList<>(); + long txnId; + long writeId; + List<String> queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a + // write id for the same db.table. If yes, then need to reuse it else have to allocate new one + // The write id would have been already allocated in case of multi-statement txns where + // first write on a table will allocate write id and rest of the writes should re-use it. + prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where" + + " t2w_database = " + quoteString(dbName) + + " and t2w_table = " + quoteString(tblName) + " and "); + suffix.append(""); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, + txnIds, "t2w_txnid", false, false); + for (String query : queries) { + LOG.debug("Going to execute query <" + query + ">"); + rs = stmt.executeQuery(query); + while (rs.next()) { + // If table write ID is already allocated for the given transaction, then just use it + txnId = rs.getLong(1); + writeId = rs.getLong(2); + txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); + allocatedTxns.add(txnId); + LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId); + } + } + + // If all the txns in the list have already allocated write ids, then just skip new allocations + long numOfWriteIds = txnIds.size() - allocatedTxns.size(); + assert(numOfWriteIds >= 0); + if (0 == numOfWriteIds) { + // If all the txns in the list have pre-allocated write ids for the given table, then just return + return new AllocateTableWriteIdsResponse(txnToWriteIds); + } + + handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); + + // There are some txns in the list which has no write id allocated and hence go ahead and do it. + // Get the next write id for the given table and update it with new next write id. + // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID + String s = sqlGenerator.addForUpdateClause( + "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName)); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + // First allocation of write id should add the table to the next_write_id meta table + // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here + s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" + + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds + 1) + ")"; + LOG.debug("Going to execute insert <" + s + ">"); + stmt.execute(s); + writeId = 1; + } else { + // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated + writeId = rs.getLong(1); + s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds) + + " where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + + // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated + // write ids + List<String> rows = new ArrayList<>(); + for (long txn : txnIds) { + if (allocatedTxns.contains(txn)) { + continue; + } + rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId); + txnToWriteIds.add(new TxnToWriteId(txn, writeId)); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); + writeId++; + } + + // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids + List<String> inserts = sqlGenerator.createInsertValuesStmt( + "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + return new AllocateTableWriteIdsResponse(txnToWriteIds); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + return allocateTableWriteIds(rqst); + } + } + + @Override @RetrySemantics.SafeToRetry public void performWriteSetGC() { Connection dbConn = null; @@ -1122,13 +1367,30 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String dbName = lc.getDbname(); String tblName = lc.getTablename(); String partName = lc.getPartitionname(); + Long writeId = null; + if (tblName != null) { + // It is assumed the caller have already allocated write id for adding/updating data to + // the acid tables. However, DDL operatons won't allocate write id and hence this query + // may return empty result sets. + // Get the write id allocated by this txn for the given table writes + s = "select t2w_writeid from TXN_TO_WRITE_ID where" + + " t2w_database = " + quoteString(dbName.toLowerCase()) + + " and t2w_table = " + quoteString(tblName.toLowerCase()) + + " and t2w_txnid = " + txnid; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + writeId = rs.getLong(1); + } + } rows.add(txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'")+ "," + - quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())); + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'")+ "," + + quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," + + (writeId == null ? "null" : writeId)); } List<String> queries = sqlGenerator.createInsertValuesStmt( - "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows); + "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows); for(String query : queries) { LOG.debug("Going to execute update <" + query + ">"); int modCount = stmt.executeUpdate(query); @@ -1810,7 +2072,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { Connection dbConn = null; Statement stmt = null; ResultSet lockHandle = null; - ResultSet rs = null; try { try { lockInternal(); @@ -1827,15 +2088,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if(rqst.isSetOperationType()) { ot = OpertaionType.fromDataOperationType(rqst.getOperationType()); } + + Long writeId = rqst.getWriteid(); List<String> rows = new ArrayList<>(); for (String partName : rqst.getPartitionnames()) { rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + - "," + quoteString(partName) + "," + quoteChar(ot.sqlConst)); + "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId); } int modCount = 0; //record partitions that were written to List<String> queries = sqlGenerator.createInsertValuesStmt( - "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows); + "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows); for(String query : queries) { LOG.debug("Going to execute update <" + query + ">"); modCount = stmt.executeUpdate(query); @@ -1880,7 +2143,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { StringBuilder buff = new StringBuilder(); switch (type) { - case DATABASE: + case DATABASE: { dbName = db.getName(); buff.append("delete from TXN_COMPONENTS where tc_database='"); @@ -1906,8 +2169,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { buff.append("'"); queries.add(buff.toString()); + buff.setLength(0); + buff.append("delete from TXN_TO_WRITE_ID where t2w_database='"); + buff.append(dbName.toLowerCase()); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from NEXT_WRITE_ID where nwi_database='"); + buff.append(dbName.toLowerCase()); + buff.append("'"); + queries.add(buff.toString()); + break; - case TABLE: + } + case TABLE: { dbName = table.getDbName(); tblName = table.getTableName(); @@ -1942,8 +2218,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { buff.append("'"); queries.add(buff.toString()); + buff.setLength(0); + buff.append("delete from TXN_TO_WRITE_ID where t2w_database='"); + buff.append(dbName.toLowerCase()); + buff.append("' and t2w_table='"); + buff.append(tblName.toLowerCase()); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from NEXT_WRITE_ID where nwi_database='"); + buff.append(dbName.toLowerCase()); + buff.append("' and nwi_table='"); + buff.append(tblName.toLowerCase()); + buff.append("'"); + queries.add(buff.toString()); + break; - case PARTITION: + } + case PARTITION: { dbName = table.getDbName(); tblName = table.getTableName(); List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns @@ -1996,8 +2289,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } break; - default: + } + default: { throw new MetaException("Invalid object type for cleanup: " + type); + } } for (String query : queries) { @@ -3003,6 +3298,115 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** + * Checks if all the txns in the list are in open state. + * @param txnIds list of txns to be evaluated for open state + * @param stmt db statement + * @return If all txns in open state, then return true else false + */ + private boolean isTxnsInOpenState(List<Long> txnIds, Statement stmt) throws SQLException { + List<String> queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + // Get the count of txns from the given list are in open state. If the returned count is same as + // the input number of txns, then it means, all are in open state. + prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + "' and "); + suffix.append(""); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, + txnIds, "txn_id", false, false); + + long count = 0; + for (String query : queries) { + LOG.debug("Going to execute query <" + query + ">"); + ResultSet rs = stmt.executeQuery(query); + if (rs.next()) { + count += rs.getLong(1); + } + } + return count == txnIds.size(); + } + + /** + * Checks if all the txns in the list are in open state. + * @param dbName Database name + * @param tblName Table on which we try to allocate write id + * @param txnIds list of txns to be evaluated for open state + * @param stmt db statement + */ + private void ensureAllTxnsValid(String dbName, String tblName, List<Long> txnIds, Statement stmt) + throws SQLException { + List<String> queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + // Check if any of the txns in the list is aborted. + prefix.append("select txn_id, txn_state from TXNS where "); + suffix.append(""); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, + txnIds, "txn_id", false, false); + Long txnId; + char txnState; + boolean isAborted = false; + StringBuilder errorMsg = new StringBuilder(); + errorMsg.append("Write ID allocation on ") + .append(Warehouse.getQualifiedName(dbName, tblName)) + .append(" failed for input txns: "); + for (String query : queries) { + LOG.debug("Going to execute query <" + query + ">"); + ResultSet rs = stmt.executeQuery(query); + while (rs.next()) { + txnId = rs.getLong(1); + txnState = rs.getString(2).charAt(0); + if (txnState != TXN_OPEN) { + isAborted = true; + errorMsg.append("{").append(txnId).append(",").append(txnState).append("}"); + } + } + } + // Check if any of the txns in the list is committed. + boolean isCommitted = checkIfTxnsCommitted(txnIds, stmt, errorMsg); + if (isAborted || isCommitted) { + LOG.error(errorMsg.toString()); + throw new IllegalStateException("Write ID allocation failed on " + + Warehouse.getQualifiedName(dbName, tblName) + + " as not all input txns in open state"); + } + } + + /** + * Checks if all the txns in the list are in committed. If yes, throw eception. + * @param txnIds list of txns to be evaluated for committed + * @param stmt db statement + * @return true if any input txn is committed, else false + */ + private boolean checkIfTxnsCommitted(List<Long> txnIds, Statement stmt, StringBuilder errorMsg) + throws SQLException { + List<String> queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + // Check if any of the txns in the list is committed. If yes, throw exception. + prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where "); + suffix.append(""); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, + txnIds, "ctc_txnid", false, false); + Long txnId; + boolean isCommitted = false; + for (String query : queries) { + LOG.debug("Going to execute query <" + query + ">"); + ResultSet rs = stmt.executeQuery(query); + while (rs.next()) { + isCommitted = true; + txnId = rs.getLong(1); + if (errorMsg != null) { + errorMsg.append("{").append(txnId).append(",c}"); + } + } + } + return isCommitted; + } + + /** * Used to raise an informative error when the caller expected a txn in a particular TxnStatus * but found it in some other status */ http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 3e27034..38fa0e2 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -38,8 +38,10 @@ import java.util.Set; @InterfaceStability.Evolving public interface TxnStore extends Configurable { - enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, - WriteSetCleaner, CompactionScheduler} + enum MUTEX_KEY { + Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, + WriteSetCleaner, CompactionScheduler, WriteIdAllocator + } // Compactor states (Should really be enum) String INITIATED_RESPONSE = "initiated"; String WORKING_RESPONSE = "working"; @@ -123,6 +125,25 @@ public interface TxnStore extends Configurable { public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( String inputDbName, String inputTableName, ValidTxnList txnList) throws MetaException; + /** + * Gets the list of valid write ids for the given table wrt to current txn + * @param rqst info on transaction and list of table names associated with given transaction + * @throws NoSuchTxnException + * @throws MetaException + */ + @RetrySemantics.ReadOnly + GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) + throws NoSuchTxnException, MetaException; + + /** + * Allocate a write ID for the given table and associate it with a transaction + * @param rqst info on transaction and table to allocate write id + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; /** * Obtain a lock. @@ -206,7 +227,7 @@ public interface TxnStore extends Configurable { CompactionResponse compact(CompactionRequest rqst) throws MetaException; /** - * Show list of current compactions + * Show list of current compactions. * @param rqst info on which compactions to show * @return compaction information * @throws MetaException @@ -226,7 +247,7 @@ public interface TxnStore extends Configurable { throws NoSuchTxnException, TxnAbortedException, MetaException; /** - * Clean up corresponding records in metastore tables + * Clean up corresponding records in metastore tables. * @param type Hive object type * @param db database object * @param table table object @@ -350,10 +371,10 @@ public interface TxnStore extends Configurable { List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException; /** - * Record the highest txn id that the {@code ci} compaction job will pay attention to. + * Record the highest write id that the {@code ci} compaction job will pay attention to. */ @RetrySemantics.Idempotent - void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException; + void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException; /** * For any given compactable entity (partition, table if not partitioned) the history of compactions http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 027fb3f..7b02865 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -18,15 +18,16 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.ValidCompactorTxnList; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -63,53 +64,94 @@ public class TxnUtils { BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; int i = 0; - for(long txn: open) { + for (long txn : open) { if (currentTxn > 0 && currentTxn == txn) continue; exceptions[i++] = txn; } - if(txns.isSetMin_open_txn()) { + if (txns.isSetMin_open_txn()) { return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn()); - } - else { + } else { return new ValidReadTxnList(exceptions, abortedBits, highWater); } } /** - * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a - * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to - * compact the files, and thus treats only open transactions as invalid. Additionally any - * txnId > highestOpenTxnId is also invalid. This is to avoid creating something like - * delta_17_120 where txnId 80, for example, is still open. - * @param txns txn list from the metastore - * @return a valid txn list. + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. + * @param currentTxnId current txn ID for which we get the valid write ids list + * @param validWriteIds valid write ids list from the metastore + * @return a valid write IDs list for the whole transaction. */ - public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { - //highWater is the last txn id that has been allocated - long highWater = txns.getTxn_high_water_mark(); - long minOpenTxn = Long.MAX_VALUE; - long[] exceptions = new long[txns.getOpen_txnsSize()]; + public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId, + GetValidWriteIdsResponse validWriteIds) { + ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId); + for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) { + validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds)); + } + return validTxnWriteIdList; + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a + * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted write ids as invalid. + * @param tableWriteIds valid write ids for the given table from the metastore + * @return a valid write IDs list for the input table + */ + public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) { + String fullTableName = tableWriteIds.getFullTableName(); + long highWater = tableWriteIds.getWriteIdHighWaterMark(); + List<Long> invalids = tableWriteIds.getInvalidWriteIds(); + BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits()); + long[] exceptions = new long[invalids.size()]; int i = 0; - for (TxnInfo txn : txns.getOpen_txns()) { - if (txn.getState() == TxnState.OPEN) { - minOpenTxn = Math.min(minOpenTxn, txn.getId()); - } - else { - //only need aborted since we don't consider anything above minOpenTxn - exceptions[i++] = txn.getId(); + for (long writeId : invalids) { + exceptions[i++] = writeId; + } + if (tableWriteIds.isSetMinOpenWriteId()) { + return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater, + tableWriteIds.getMinOpenWriteId()); + } else { + return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater); + } + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a + * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to + * compact the files, and thus treats only open transactions/write ids as invalid. Additionally any + * writeId > highestOpenWriteId is also invalid. This is to avoid creating something like + * delta_17_120 where writeId 80, for example, is still open. + * @param tableValidWriteIds table write id list from the metastore + * @return a valid write id list. + */ + public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) { + String fullTableName = tableValidWriteIds.getFullTableName(); + long highWater = tableValidWriteIds.getWriteIdHighWaterMark(); + long minOpenWriteId = Long.MAX_VALUE; + List<Long> invalids = tableValidWriteIds.getInvalidWriteIds(); + BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits()); + long[] exceptions = new long[invalids.size()]; + int i = 0; + for (long writeId : invalids) { + if (abortedBits.get(i)) { + // Only need aborted since we don't consider anything above minOpenWriteId + exceptions[i++] = writeId; + } else { + minOpenWriteId = Math.min(minOpenWriteId, writeId); } } if(i < exceptions.length) { exceptions = Arrays.copyOf(exceptions, i); } - highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; + highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1; BitSet bitSet = new BitSet(exceptions.length); - bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted - if(minOpenTxn == Long.MAX_VALUE) { - return new ValidCompactorTxnList(exceptions, bitSet, highWater); - } - else { - return new ValidCompactorTxnList(exceptions, bitSet, highWater, minOpenTxn); + bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted + if (minOpenWriteId == Long.MAX_VALUE) { + return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater); + } else { + return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId); } } @@ -134,7 +176,7 @@ public class TxnUtils { * Note, users are responsible for using the correct TxnManager. We do not look at * SessionState.get().getTxnMgr().supportsAcid() here * Should produce the same result as - * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)} + * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}. * @return true if table is a transactional table, false otherwise */ public static boolean isTransactionalTable(Table table) { @@ -148,7 +190,7 @@ public class TxnUtils { /** * Should produce the same result as - * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)} + * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}. */ public static boolean isAcidTable(Table table) { return TxnUtils.isTransactionalTable(table) && @@ -157,6 +199,19 @@ public class TxnUtils { } /** + * Should produce the result as <dbName>.<tableName>. + */ + public static String getFullTableName(String dbName, String tableName) { + return dbName.toLowerCase() + "." + tableName.toLowerCase(); + } + + public static String[] getDbTableName(String fullTableName) { + return fullTableName.split("\\."); + } + + + + /** * Build a query (or queries if one query is too big but only for the case of 'IN' * composite clause. For the case of 'NOT IN' clauses, multiple queries change * the semantics of the intended query. @@ -357,7 +412,7 @@ public class TxnUtils { return ret; } - /* + /** * Compute and return the size of a query statement with the given parameters as input variables. * * @param sizeSoFar size of the current contents of the buf http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql index ac28869..9d8a703 100644 --- a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql @@ -422,7 +422,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_DATABASE varchar(128) NOT NULL, TC_TABLE varchar(128), TC_PARTITION varchar(767), - TC_OPERATION_TYPE char(1) NOT NULL + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint ); CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); @@ -432,7 +433,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CTC_DATABASE varchar(128) NOT NULL, CTC_TABLE varchar(256), CTC_PARTITION varchar(767), - CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL + CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID bigint ); CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); @@ -480,7 +482,7 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_WORKER_ID varchar(128), CQ_START bigint, CQ_RUN_AS varchar(128), - CQ_HIGHEST_TXN_ID bigint, + CQ_HIGHEST_WRITE_ID bigint, CQ_META_INFO varchar(2048) for bit data, CQ_HADOOP_JOB_ID varchar(32) ); @@ -502,7 +504,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_START bigint, CC_END bigint, CC_RUN_AS varchar(128), - CC_HIGHEST_TXN_ID bigint, + CC_HIGHEST_WRITE_ID bigint, CC_META_INFO varchar(2048) for bit data, CC_HADOOP_JOB_ID varchar(32) ); @@ -525,6 +527,23 @@ CREATE TABLE WRITE_SET ( WS_OPERATION_TYPE char(1) NOT NULL ); +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql index d49255a..a50c45d 100644 --- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -94,3 +94,29 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +-- 050-HIVE-18192.derby.sql +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +RENAME COLUMN COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID; + +RENAME COLUMN COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID; + +-- Modify txn_components/completed_txn_components tables to add write id. +ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint; +ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql index 7c26d5d..1b7d0da 100644 --- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql @@ -969,7 +969,7 @@ CREATE TABLE COMPACTION_QUEUE( CQ_WORKER_ID nvarchar(128) NULL, CQ_START bigint NULL, CQ_RUN_AS nvarchar(128) NULL, - CQ_HIGHEST_TXN_ID bigint NULL, + CQ_HIGHEST_WRITE_ID bigint NULL, CQ_META_INFO varbinary(2048) NULL, CQ_HADOOP_JOB_ID nvarchar(128) NULL, PRIMARY KEY CLUSTERED @@ -990,7 +990,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_START bigint NULL, CC_END bigint NULL, CC_RUN_AS nvarchar(128) NULL, - CC_HIGHEST_TXN_ID bigint NULL, + CC_HIGHEST_WRITE_ID bigint NULL, CC_META_INFO varbinary(2048) NULL, CC_HADOOP_JOB_ID nvarchar(128) NULL, PRIMARY KEY CLUSTERED @@ -1004,7 +1004,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS( CTC_DATABASE nvarchar(128) NOT NULL, CTC_TABLE nvarchar(128) NULL, CTC_PARTITION nvarchar(767) NULL, - CTC_TIMESTAMP datetime2 DEFAULT CURRENT_TIMESTAMP NOT NULL + CTC_TIMESTAMP datetime2 DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID bigint ); CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX2 ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); @@ -1072,7 +1073,8 @@ CREATE TABLE TXN_COMPONENTS( TC_DATABASE nvarchar(128) NOT NULL, TC_TABLE nvarchar(128) NULL, TC_PARTITION nvarchar(767) NULL, - TC_OPERATION_TYPE char(1) NOT NULL + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint ); ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); @@ -1129,6 +1131,23 @@ CREATE TABLE METASTORE_DB_PROPERTIES ( ALTER TABLE METASTORE_DB_PROPERTIES ADD CONSTRAINT PROPERTY_KEY_PK PRIMARY KEY (PROPERTY_KEY); +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE nvarchar(128) NOT NULL, + T2W_TABLE nvarchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE nvarchar(128) NOT NULL, + NWI_TABLE nvarchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql index 6dc3e1a..8ab466d 100644 --- a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql @@ -148,3 +148,29 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = 's3a' + SUBSTRING(DB_LOCATION_URI, 4, LEN(DB_LOCATION_URI)) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +-- HIVE-18192 +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE nvarchar(128) NOT NULL, + T2W_TABLE nvarchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE nvarchar(128) NOT NULL, + NWI_TABLE nvarchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +EXEC SP_RENAME 'COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID', 'CQ_HIGHEST_WRITE_ID', 'COLUMN'; + +EXEC SP_RENAME 'COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID', 'CC_HIGHEST_WRITE_ID', 'COLUMN'; + +-- Modify txn_components/completed_txn_components tables to add write id. +ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint; +ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql index 0eb2e2e..886c932 100644 --- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql @@ -962,6 +962,7 @@ CREATE TABLE TXN_COMPONENTS ( TC_TABLE varchar(128) NOT NULL, TC_PARTITION varchar(767), TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint, FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; @@ -972,7 +973,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CTC_DATABASE varchar(128) NOT NULL, CTC_TABLE varchar(256), CTC_PARTITION varchar(767), - CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL + CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID bigint ) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX2 ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION) USING BTREE; @@ -1021,7 +1023,7 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_WORKER_ID varchar(128), CQ_START bigint, CQ_RUN_AS varchar(128), - CQ_HIGHEST_TXN_ID bigint, + CQ_HIGHEST_WRITE_ID bigint, CQ_META_INFO varbinary(2048), CQ_HADOOP_JOB_ID varchar(32) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; @@ -1038,7 +1040,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_START bigint, CC_END bigint, CC_RUN_AS varchar(128), - CC_HIGHEST_TXN_ID bigint, + CC_HIGHEST_WRITE_ID bigint, CC_META_INFO varbinary(2048), CC_HADOOP_JOB_ID varchar(32) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; @@ -1063,6 +1065,24 @@ CREATE TABLE WRITE_SET ( WS_COMMIT_ID bigint NOT NULL, WS_OPERATION_TYPE char(1) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql index 0a170f6..a537734 100644 --- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql @@ -133,3 +133,29 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = CONCAT('s3a', SUBSTR(DB_LOCATION_URI, 4, LENGTH(DB_LOCATION_URI))) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +-- HIVE-18192 +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +ALTER TABLE COMPACTION_QUEUE CHANGE `CQ_HIGHEST_TXN_ID` `CQ_HIGHEST_WRITE_ID` bigint; + +ALTER TABLE COMPLETED_COMPACTIONS CHANGE `CC_HIGHEST_TXN_ID` `CC_HIGHEST_WRITE_ID` bigint; + +-- Modify txn_components/completed_txn_components tables to add write id. +ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint; +ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql index 37f9063..366b2d9 100644 --- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql @@ -936,7 +936,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_DATABASE VARCHAR2(128) NOT NULL, TC_TABLE VARCHAR2(256), TC_PARTITION VARCHAR2(767) NULL, - TC_OPERATION_TYPE char(1) NOT NULL + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID NUMBER(19) ) ROWDEPENDENCIES; CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); @@ -946,7 +947,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CTC_DATABASE VARCHAR2(128) NOT NULL, CTC_TABLE VARCHAR2(128), CTC_PARTITION VARCHAR2(767), - CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL + CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID NUMBER(19) ) ROWDEPENDENCIES; CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); @@ -994,7 +996,7 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_WORKER_ID varchar(128), CQ_START NUMBER(19), CQ_RUN_AS varchar(128), - CQ_HIGHEST_TXN_ID NUMBER(19), + CQ_HIGHEST_WRITE_ID NUMBER(19), CQ_META_INFO BLOB, CQ_HADOOP_JOB_ID varchar2(32) ) ROWDEPENDENCIES; @@ -1016,7 +1018,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_START NUMBER(19), CC_END NUMBER(19), CC_RUN_AS varchar(128), - CC_HIGHEST_TXN_ID NUMBER(19), + CC_HIGHEST_WRITE_ID NUMBER(19), CC_META_INFO BLOB, CC_HADOOP_JOB_ID varchar2(32) ) ROWDEPENDENCIES; @@ -1037,6 +1039,23 @@ CREATE TABLE WRITE_SET ( WS_OPERATION_TYPE char(1) NOT NULL ); +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID number(19) NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID number(19) NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT number(19) NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql index a923d92..bd786fb 100644 --- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql @@ -156,3 +156,29 @@ UPDATE SDS UPDATE DBS SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4) WHERE DB_LOCATION_URI LIKE 's3n://%' ; + +-- HIVE-18192 +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID number(19) NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID number(19) NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT number(19) NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +ALTER TABLE COMPACTION_QUEUE RENAME COLUMN CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID; + +ALTER TABLE COMPLETED_COMPACTIONS RENAME COLUMN CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID; + +-- Modify txn_components/completed_txn_components tables to add write id. +ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID number(19); +ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID number(19); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql index 9d63056..4abf24c 100644 --- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql @@ -1628,7 +1628,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_DATABASE varchar(128) NOT NULL, TC_TABLE varchar(128), TC_PARTITION varchar(767) DEFAULT NULL, - TC_OPERATION_TYPE char(1) NOT NULL + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint ); CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS USING hash (TC_TXNID); @@ -1638,7 +1639,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CTC_DATABASE varchar(128) NOT NULL, CTC_TABLE varchar(256), CTC_PARTITION varchar(767), - CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL + CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID bigint ); CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS USING btree (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); @@ -1686,7 +1688,7 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_WORKER_ID varchar(128), CQ_START bigint, CQ_RUN_AS varchar(128), - CQ_HIGHEST_TXN_ID bigint, + CQ_HIGHEST_WRITE_ID bigint, CQ_META_INFO bytea, CQ_HADOOP_JOB_ID varchar(32) ); @@ -1708,7 +1710,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_START bigint, CC_END bigint, CC_RUN_AS varchar(128), - CC_HIGHEST_TXN_ID bigint, + CC_HIGHEST_WRITE_ID bigint, CC_META_INFO bytea, CC_HADOOP_JOB_ID varchar(32) ); @@ -1729,6 +1731,23 @@ CREATE TABLE WRITE_SET ( WS_OPERATION_TYPE char(1) NOT NULL ); +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql index eb45cd2..34ed974 100644 --- a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql @@ -172,3 +172,29 @@ UPDATE "SDS" UPDATE "DBS" SET "DB_LOCATION_URI" = 's3a' || SUBSTR("DB_LOCATION_URI", 4) WHERE "DB_LOCATION_URI" LIKE 's3n://%' ; + +-- HIVE-18192 +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +ALTER TABLE COMPACTION_QUEUE RENAME CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID; + +ALTER TABLE COMPLETED_COMPACTIONS RENAME CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID; + +-- Modify txn_components/completed_txn_components tables to add write id. +ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint; +ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/thrift/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift index 35fc8b3..b11ee38 100644 --- a/standalone-metastore/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift @@ -731,6 +731,43 @@ struct CommitTxnRequest { 1: required i64 txnid, } +// Request msg to get the valid write ids list for the given list of tables wrt to input validTxnList +struct GetValidWriteIdsRequest { + 1: required list<string> fullTableNames, // Full table names of format <db_name>.<table_name> + 2: required string validTxnList, // Valid txn list string wrt the current txn of the caller +} + +// Valid Write ID list of one table wrt to current txn +struct TableValidWriteIds { + 1: required string fullTableName, // Full table name of format <db_name>.<table_name> + 2: required i64 writeIdHighWaterMark, // The highest write id valid for this table wrt given txn + 3: required list<i64> invalidWriteIds, // List of open and aborted writes ids in the table + 4: optional i64 minOpenWriteId, // Minimum write id which maps to a opened txn + 5: required binary abortedBits, // Bit array to identify the aborted write ids in invalidWriteIds list +} + +// Valid Write ID list for all the input tables wrt to current txn +struct GetValidWriteIdsResponse { + 1: required list<TableValidWriteIds> tblValidWriteIds, +} + +// Request msg to allocate table write ids for the given list of txns +struct AllocateTableWriteIdsRequest { + 1: required list<i64> txnIds, + 2: required string dbName, + 3: required string tableName, +} + +// Map for allocated write id against the txn for which it is allocated +struct TxnToWriteId { + 1: required i64 txnId, + 2: required i64 writeId, +} + +struct AllocateTableWriteIdsResponse { + 1: required list<TxnToWriteId> txnToWriteIds, +} + struct LockComponent { 1: required LockType type, 2: required LockLevel level, @@ -850,10 +887,11 @@ struct ShowCompactResponse { struct AddDynamicPartitions { 1: required i64 txnid, - 2: required string dbname, - 3: required string tablename, - 4: required list<string> partitionnames, - 5: optional DataOperationType operationType = DataOperationType.UNSET + 2: required i64 writeid, + 3: required string dbname, + 4: required string tablename, + 5: required list<string> partitionnames, + 6: optional DataOperationType operationType = DataOperationType.UNSET } struct BasicTxnInfo { @@ -1807,6 +1845,10 @@ service ThriftHiveMetastore extends fb303.FacebookService void abort_txn(1:AbortTxnRequest rqst) throws (1:NoSuchTxnException o1) void abort_txns(1:AbortTxnsRequest rqst) throws (1:NoSuchTxnException o1) void commit_txn(1:CommitTxnRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2) + GetValidWriteIdsResponse get_valid_write_ids(1:GetValidWriteIdsRequest rqst) + throws (1:NoSuchTxnException o1, 2:MetaException o2) + AllocateTableWriteIdsResponse allocate_table_write_ids(1:AllocateTableWriteIdsRequest rqst) + throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:MetaException o3) LockResponse lock(1:LockRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2) LockResponse check_lock(1:CheckLockRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:NoSuchLockException o3) http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java deleted file mode 100644 index 94b8c58..0000000 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common; - -import java.util.Arrays; -import java.util.BitSet; - -/** - * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. - * - * Compaction should only include txns up to smallest open txn (exclussive). - * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList. - * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved - * transactions. Any txn above {@code highWatermark} is unresolved. - * These produce the logic we need to assure that the compactor only sees records less than the lowest - * open transaction when choosing which files to compact, but that it still ignores aborted - * records when compacting. - * - * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper - * way to construct this. - */ -public class ValidCompactorTxnList extends ValidReadTxnList { - public ValidCompactorTxnList() { - super(); - } - public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) { - this(abortedTxnList, abortedBits, highWatermark, Long.MAX_VALUE); - } - /** - * @param abortedTxnList list of all aborted transactions - * @param abortedBits bitset marking whether the corresponding transaction is aborted - * @param highWatermark highest committed transaction to be considered for compaction, - * equivalently (lowest_open_txn - 1). - */ - public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark, long minOpenTxnId) { - // abortedBits should be all true as everything in exceptions are aborted txns - super(abortedTxnList, abortedBits, highWatermark, minOpenTxnId); - if(this.exceptions.length <= 0) { - return; - } - //now that exceptions (aka abortedTxnList) is sorted - int idx = Arrays.binarySearch(this.exceptions, highWatermark); - int lastElementPos; - if(idx < 0) { - int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc - lastElementPos = insertionPoint - 1; - } - else { - lastElementPos = idx; - } - /* - * ensure that we throw out any exceptions above highWatermark to make - * {@link #isTxnValid(long)} faster - */ - this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1); - } - public ValidCompactorTxnList(String value) { - super(value); - } - /** - * Returns org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL if all txns in - * the range are resolved and RangeResponse.NONE otherwise - */ - @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; - } - - @Override - public boolean isTxnAborted(long txnid) { - return Arrays.binarySearch(exceptions, txnid) >= 0; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java new file mode 100644 index 0000000..9f6cf47 --- /dev/null +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import java.util.Arrays; +import java.util.BitSet; + +/** + * An implementation of {@link ValidWriteIdList} for use by the compactor. + * + * Compaction should only include txns up to smallest open txn (exclussive). + * There may be aborted write ids in the snapshot represented by this ValidCompactorWriteIdList. + * Thus {@link #isWriteIdRangeValid(long, long)} returns NONE for any range that includes any unresolved + * write ids. Any write id above {@code highWatermark} is unresolved. + * These produce the logic we need to assure that the compactor only sees records less than the lowest + * open write ids when choosing which files to compact, but that it still ignores aborted + * records when compacting. + * + * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper + * way to construct this. + */ +public class ValidCompactorWriteIdList extends ValidReaderWriteIdList { + public ValidCompactorWriteIdList() { + super(); + } + public ValidCompactorWriteIdList(String tableName, long[] abortedWriteIdList, BitSet abortedBits, + long highWatermark) { + this(tableName, abortedWriteIdList, abortedBits, highWatermark, Long.MAX_VALUE); + } + /** + * @param tableName table which is under compaction. Full name of format <db_name>.<table_name> + * @param abortedWriteIdList list of all aborted write ids + * @param abortedBits bitset marking whether the corresponding transaction is aborted + * @param highWatermark highest committed write id to be considered for compaction, + * equivalently (lowest_open_write_id - 1). + * @param minOpenWriteId minimum write ID which maps to a open transaction + */ + public ValidCompactorWriteIdList(String tableName, long[] abortedWriteIdList, BitSet abortedBits, + long highWatermark, long minOpenWriteId) { + // abortedBits should be all true as everything in exceptions are aborted txns + super(tableName, abortedWriteIdList, abortedBits, highWatermark, minOpenWriteId); + if(this.exceptions.length <= 0) { + return; + } + //now that exceptions (aka abortedTxnList) is sorted + int idx = Arrays.binarySearch(this.exceptions, highWatermark); + int lastElementPos; + if(idx < 0) { + int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc + lastElementPos = insertionPoint - 1; + } + else { + lastElementPos = idx; + } + /* + * ensure that we throw out any exceptions above highWatermark to make + * {@link #isWriteIdValid(long)} faster + */ + this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1); + } + public ValidCompactorWriteIdList(String value) { + super(value); + } + /** + * Returns org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse.ALL if all write ids in + * the range are resolved and RangeResponse.NONE otherwise + */ + @Override + public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) { + return highWatermark >= maxWriteId ? RangeResponse.ALL : RangeResponse.NONE; + } + + @Override + public boolean isWriteIdAborted(long writeId) { + return Arrays.binarySearch(exceptions, writeId) >= 0; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java ---------------------------------------------------------------------- diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index ccdd4b7..dd432d9 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -59,24 +59,16 @@ public class ValidReadTxnList implements ValidTxnList { @Override public boolean isTxnValid(long txnid) { - if (highWatermark < txnid) { + if (txnid > highWatermark) { return false; } return Arrays.binarySearch(exceptions, txnid) < 0; } - /** - * We cannot use a base file if its range contains an open txn. - * @param txnid from base_xxxx - */ - @Override - public boolean isValidBase(long txnid) { - return minOpenTxn > txnid && txnid <= highWatermark; - } @Override public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { // check the easy cases first - if (highWatermark < minTxnId) { + if (minTxnId > highWatermark) { return RangeResponse.NONE; } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) { return RangeResponse.ALL;