http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 0000000,33f24fb..080cc52 mode 000000,100644..100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@@ -1,0 -1,504 +1,509 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.metastore.txn; + + import com.google.common.annotations.VisibleForTesting; ++ + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.classification.InterfaceStability; + import org.apache.hadoop.conf.Configurable; + import org.apache.hadoop.hive.common.ValidTxnList; + import org.apache.hadoop.hive.common.ValidWriteIdList; + import org.apache.hadoop.hive.common.classification.RetrySemantics; + import org.apache.hadoop.hive.metastore.api.*; + import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; + + import java.sql.SQLException; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Set; + + /** + * A handler to answer transaction related calls that come into the metastore + * server. + */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + public interface TxnStore extends Configurable { + + enum MUTEX_KEY { + Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, + WriteSetCleaner, CompactionScheduler, WriteIdAllocator, MaterializationRebuild + } + // Compactor states (Should really be enum) + String INITIATED_RESPONSE = "initiated"; + String WORKING_RESPONSE = "working"; + String CLEANING_RESPONSE = "ready for cleaning"; + String FAILED_RESPONSE = "failed"; + String SUCCEEDED_RESPONSE = "succeeded"; + String ATTEMPTED_RESPONSE = "attempted"; + + int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000; + + /** + * Get information about open transactions. This gives extensive information about the + * transactions rather than just the list of transactions. This should be used when the need + * is to see information about the transactions (e.g. show transactions). + * @return information about open transactions + * @throws MetaException + */ + @RetrySemantics.ReadOnly + GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException; + + /** + * Get list of valid transactions. This gives just the list of transactions that are open. + * @return list of open transactions, as well as a high water mark. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + GetOpenTxnsResponse getOpenTxns() throws MetaException; + + /** + * Get the count for open transactions. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + void countOpenTxns() throws MetaException; + + /** + * Open a set of transactions + * @param rqst request to open transactions + * @return information on opened transactions + * @throws MetaException + */ + @RetrySemantics.Idempotent + OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException; + + @RetrySemantics.Idempotent + long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException; + + /** + * Abort (rollback) a transaction. + * @param rqst info on transaction to abort + * @throws NoSuchTxnException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException; + + /** + * Abort (rollback) a list of transactions in one request. + * @param rqst info on transactions to abort + * @throws NoSuchTxnException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException; + + /** + * Commit a transaction + * @param rqst info on transaction to commit + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. + * @param rqst info on table/partitions and writeid snapshot to replicate. + * @throws MetaException in case of failure + */ + @RetrySemantics.Idempotent + void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException; + + /** + * Get invalidation info for the materialization. Currently, the materialization information + * only contains information about whether there was update/delete operations on the source + * tables used by the materialization since it was created. + * @param cm creation metadata for the materialization + * @param validTxnList valid transaction list for snapshot taken for current query + * @throws MetaException + */ + @RetrySemantics.Idempotent + Materialization getMaterializationInvalidationInfo( + final CreationMetadata cm, final String validTxnList) + throws MetaException; + ++ @RetrySemantics.ReadOnly ++ long getTxnIdForWriteId(String dbName, String tblName, long writeId) ++ throws MetaException; ++ + LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) + throws MetaException; + + boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) + throws MetaException; + + long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long timeout) + throws MetaException; + + /** + * Gets the list of valid write ids for the given table wrt to current txn + * @param rqst info on transaction and list of table names associated with given transaction + * @throws NoSuchTxnException + * @throws MetaException + */ + @RetrySemantics.ReadOnly + GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) + throws NoSuchTxnException, MetaException; + + /** + * Allocate a write ID for the given table and associate it with a transaction + * @param rqst info on transaction and table to allocate write id + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Called on conversion of existing table to full acid. Sets initial write ID to a high + * enough value so that we can assign unique ROW__IDs to data in existing files. + */ + void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throws MetaException; + + /** + * Obtain a lock. + * @param rqst information on the lock to obtain. If the requester is part of a transaction + * the txn information must be included in the lock request. + * @return info on the lock, including whether it was obtained. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.CannotRetry + LockResponse lock(LockRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait + * state. + * @param rqst info on the lock to check + * @return info on the state of the lock + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + LockResponse checkLock(CheckLockRequest rqst) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case + * the txn should be committed or aborted instead. (Note someday this will change since + * multi-statement transactions will allow unlocking in the transaction.) + * @param rqst lock to unlock + * @throws NoSuchLockException + * @throws TxnOpenException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void unlock(UnlockRequest rqst) + throws NoSuchLockException, TxnOpenException, MetaException; + + /** + * Get information on current locks. + * @param rqst lock information to retrieve + * @return lock information. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException; + + /** + * Send a heartbeat for a lock or a transaction + * @param ids lock and/or txn id to heartbeat + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + void heartbeat(HeartbeatRequest ids) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Heartbeat a group of transactions together + * @param rqst set of transactions to heartbat + * @return info on txns that were heartbeated + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException; + + /** + * Submit a compaction request into the queue. This is called when a user manually requests a + * compaction. + * @param rqst information on what to compact + * @return id of the compaction that has been started or existing id if this resource is already scheduled + * @throws MetaException + */ + @RetrySemantics.Idempotent + CompactionResponse compact(CompactionRequest rqst) throws MetaException; + + /** + * Show list of current compactions. + * @param rqst info on which compactions to show + * @return compaction information + * @throws MetaException + */ + @RetrySemantics.ReadOnly + ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException; + + /** + * Add information on a set of dynamic partitions that participated in a transaction. + * @param rqst dynamic partition info. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + void addDynamicPartitions(AddDynamicPartitions rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Clean up corresponding records in metastore tables. + * @param type Hive object type + * @param db database object + * @param table table object + * @param partitionIterator partition iterator + * @throws MetaException + */ + @RetrySemantics.Idempotent + void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator) throws MetaException; + + @RetrySemantics.Idempotent + void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName, + String newCatName, String newDbName, String newTabName, String newPartName) + throws MetaException; + + /** + * Timeout transactions and/or locks. This should only be called by the compactor. + */ + @RetrySemantics.Idempotent + void performTimeOuts(); + + /** + * This will look through the completed_txn_components table and look for partitions or tables + * that may be ready for compaction. Also, look through txns and txn_components tables for + * aborted transactions that we should add to the list. + * @param maxAborted Maximum number of aborted queries to allow before marking this as a + * potential compaction. + * @return list of CompactionInfo structs. These will not have id, type, + * or runAs set since these are only potential compactions not actual ones. + */ + @RetrySemantics.ReadOnly + Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException; + + /** + * Sets the user to run as. This is for the case + * where the request was generated by the user and so the worker must set this value later. + * @param cq_id id of this entry in the queue + * @param user user to run the jobs as + */ + @RetrySemantics.Idempotent + void setRunAs(long cq_id, String user) throws MetaException; + + /** + * This will grab the next compaction request off of + * the queue, and assign it to the worker. + * @param workerId id of the worker calling this, will be recorded in the db + * @return an info element for this compaction request, or null if there is no work to do now. + */ + @RetrySemantics.ReadOnly + CompactionInfo findNextToCompact(String workerId) throws MetaException; + + /** + * This will mark an entry in the queue as compacted + * and put it in the ready to clean state. + * @param info info on the compaction entry to mark as compacted. + */ + @RetrySemantics.SafeToRetry + void markCompacted(CompactionInfo info) throws MetaException; + + /** + * Find entries in the queue that are ready to + * be cleaned. + * @return information on the entry in the queue. + */ + @RetrySemantics.ReadOnly + List<CompactionInfo> findReadyToClean() throws MetaException; + + /** + * This will remove an entry from the queue after + * it has been compacted. + * + * @param info info on the compaction entry to remove + */ + @RetrySemantics.CannotRetry + void markCleaned(CompactionInfo info) throws MetaException; + + /** + * Mark a compaction entry as failed. This will move it to the compaction history queue with a + * failed status. It will NOT clean up aborted transactions in the table/partition associated + * with this compaction. + * @param info information on the compaction that failed. + * @throws MetaException + */ + @RetrySemantics.CannotRetry + void markFailed(CompactionInfo info) throws MetaException; + + /** + * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by + * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). + */ + @RetrySemantics.SafeToRetry + void cleanTxnToWriteIdTable() throws MetaException; + + /** + * Clean up aborted transactions from txns that have no components in txn_components. The reson such + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + */ + @RetrySemantics.SafeToRetry + void cleanEmptyAbortedTxns() throws MetaException; + + /** + * This will take all entries assigned to workers + * on a host return them to INITIATED state. The initiator should use this at start up to + * clean entries from any workers that were in the middle of compacting when the metastore + * shutdown. It does not reset entries from worker threads on other hosts as those may still + * be working. + * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, + * so that like hostname% will match the worker id. + */ + @RetrySemantics.Idempotent + void revokeFromLocalWorkers(String hostname) throws MetaException; + + /** + * This call will return all compaction queue + * entries assigned to a worker but over the timeout back to the initiated state. + * This should be called by the initiator on start up and occasionally when running to clean up + * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called + * first. + * @param timeout number of milliseconds since start time that should elapse before a worker is + * declared dead. + */ + @RetrySemantics.Idempotent + void revokeTimedoutWorkers(long timeout) throws MetaException; + + /** + * Queries metastore DB directly to find columns in the table which have statistics information. + * If {@code ci} includes partition info then per partition stats info is examined, otherwise + * table level stats are examined. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException; + + /** + * Record the highest write id that the {@code ci} compaction job will pay attention to. + */ + @RetrySemantics.Idempotent + void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException; + + /** + * For any given compactable entity (partition, table if not partitioned) the history of compactions + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the + * history such that a configurable number of each type of state is present. Any other entries + * can be purged. This scheme has advantage of always retaining the last failure/success even if + * it's not recent. + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + void purgeCompactionHistory() throws MetaException; + + /** + * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the + * transaction metadata once it becomes unnecessary. + */ + @RetrySemantics.SafeToRetry + void performWriteSetGC(); + + /** + * Determine if there are enough consecutive failures compacting a table or partition that no + * new automatic compactions should be scheduled. User initiated compactions do not do this + * check. + * @param ci Table or partition to check. + * @return true if it is ok to compact, false if there have been too many failures. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; + + @VisibleForTesting + int numLocksInLockTable() throws SQLException, MetaException; + + @VisibleForTesting + long setTimeout(long milliseconds); + + @RetrySemantics.Idempotent + MutexAPI getMutexAPI(); + + /** + * This is primarily designed to provide coarse grained mutex support to operations running + * inside the Metastore (of which there could be several instances). The initial goal is to + * ensure that various sub-processes of the Compactor don't step on each other. + * + * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly. + */ + interface MutexAPI { + /** + * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * a handle which must be used to release the lock. Each invocation returns a new handle. + */ + LockHandle acquireLock(String key) throws MetaException; + + /** + * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This + * will associate the lock on {@code key} with the same handle. All locks associated with + * the same handle will be released together. + * @param handle not NULL + */ + void acquireLock(String key, LockHandle handle) throws MetaException; + interface LockHandle { + /** + * Releases all locks associated with this handle. + */ + void releaseLocks(); + } + } + + /** + * Once a {@link java.util.concurrent.ThreadPoolExecutor} Worker submits a job to the cluster, + * it calls this to update the metadata. + * @param id {@link CompactionInfo#id} + */ + @RetrySemantics.Idempotent + void setHadoopJobId(String hadoopJobId, long id); + + /** + * Add the ACID write event information to writeNotificationLog table. + * @param acidWriteEvent + */ + @RetrySemantics.Idempotent + void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException; + }
http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 0000000,fa291d5..aac5811 mode 000000,100644..100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@@ -1,0 -1,471 +1,481 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.metastore.txn; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; + import org.apache.hadoop.hive.common.ValidReaderWriteIdList; + import org.apache.hadoop.hive.common.ValidReadTxnList; + import org.apache.hadoop.hive.common.ValidTxnList; + import org.apache.hadoop.hive.common.ValidTxnWriteIdList; + import org.apache.hadoop.hive.common.ValidWriteIdList; + import org.apache.hadoop.hive.metastore.TransactionalValidationListener; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; ++import org.apache.hadoop.hive.metastore.api.*; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; + import org.apache.hadoop.hive.metastore.utils.JavaUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.util.Collections; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.BitSet; + import java.util.List; + import java.util.Map; + + public class TxnUtils { + private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); + ++ // Transactional stats states ++ static final public char STAT_OPEN = 'o'; ++ static final public char STAT_INVALID = 'i'; ++ static final public char STAT_COMMITTED = 'c'; ++ static final public char STAT_OBSOLETE = 's'; ++ + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. + * @param txns txn list from the metastore + * @param currentTxn Current transaction that the user has open. If this is greater than 0 it + * will be removed from the exceptions list so that the user sees his own + * transaction as valid. + * @return a valid txn list. + */ + public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + /* + * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 + * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which + * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should + * include the latest committed set. + */ + long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark()) + : txns.getTxn_high_water_mark(); + + // Open txns are already sorted in ascending order. This list may or may not include HWM + // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn + // then need to truncate the exceptions list accordingly. + List<Long> openTxns = txns.getOpen_txns(); + + // We care only about open/aborted txns below currentTxn and hence the size should be determined + // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like + // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns. + // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so, + // we just negate it to get the size. + int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size(); + sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm; + long[] exceptions = new long[sizeToHwm]; + BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits()); + BitSet outAbortedBits = new BitSet(); + long minOpenTxnId = Long.MAX_VALUE; + int i = 0; + for (long txn : openTxns) { + // For snapshot isolation, we don't care about txns greater than current txn and so stop here. + // Also, we need not include current txn to exceptions list. + if ((currentTxn > 0) && (txn >= currentTxn)) { + break; + } + if (inAbortedBits.get(i)) { + outAbortedBits.set(i); + } else if (minOpenTxnId == Long.MAX_VALUE) { + minOpenTxnId = txn; + } + exceptions[i++] = txn; + } + return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId); + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. + * @param currentTxnId current txn ID for which we get the valid write ids list + * @param list valid write ids list from the metastore + * @return a valid write IDs list for the whole transaction. + */ + public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId, + List<TableValidWriteIds> validIds) { + ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId); + for (TableValidWriteIds tableWriteIds : validIds) { + validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds)); + } + return validTxnWriteIdList; + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a + * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted write ids as invalid. + * @param tableWriteIds valid write ids for the given table from the metastore + * @return a valid write IDs list for the input table + */ + public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) { + String fullTableName = tableWriteIds.getFullTableName(); + long highWater = tableWriteIds.getWriteIdHighWaterMark(); + List<Long> invalids = tableWriteIds.getInvalidWriteIds(); + BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits()); + long[] exceptions = new long[invalids.size()]; + int i = 0; + for (long writeId : invalids) { + exceptions[i++] = writeId; + } + if (tableWriteIds.isSetMinOpenWriteId()) { + return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater, + tableWriteIds.getMinOpenWriteId()); + } else { + return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater); + } + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a + * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to + * compact the files, and thus treats only open transactions/write ids as invalid. Additionally any + * writeId > highestOpenWriteId is also invalid. This is to avoid creating something like + * delta_17_120 where writeId 80, for example, is still open. + * @param tableValidWriteIds table write id list from the metastore + * @return a valid write id list. + */ + public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) { + String fullTableName = tableValidWriteIds.getFullTableName(); + long highWater = tableValidWriteIds.getWriteIdHighWaterMark(); + long minOpenWriteId = Long.MAX_VALUE; + List<Long> invalids = tableValidWriteIds.getInvalidWriteIds(); + BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits()); + long[] exceptions = new long[invalids.size()]; + int i = 0; + for (long writeId : invalids) { + if (abortedBits.get(i)) { + // Only need aborted since we don't consider anything above minOpenWriteId + exceptions[i++] = writeId; + } else { + minOpenWriteId = Math.min(minOpenWriteId, writeId); + } + } + if(i < exceptions.length) { + exceptions = Arrays.copyOf(exceptions, i); + } + highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1; + BitSet bitSet = new BitSet(exceptions.length); + bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted + if (minOpenWriteId == Long.MAX_VALUE) { + return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater); + } else { + return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId); + } + } + + public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) { + // This is based on the existing valid write ID list that was built for a select query; + // therefore we assume all the aborted txns, etc. were already accounted for. + // All we do is adjust the high watermark to only include contiguous txns. + Long minOpenWriteId = ids.getMinOpenWriteId(); + if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { + return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); + } + return ids; + } + + /** + * Get an instance of the TxnStore that is appropriate for this store + * @param conf configuration + * @return txn store + */ + public static TxnStore getTxnStore(Configuration conf) { + String className = MetastoreConf.getVar(conf, ConfVars.TXN_STORE_IMPL); + try { + TxnStore handler = JavaUtils.getClass(className, TxnStore.class).newInstance(); + handler.setConf(conf); + return handler; + } catch (Exception e) { + LOG.error("Unable to instantiate raw store directly in fastpath mode", e); + throw new RuntimeException(e); + } + } + + /** + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * Should produce the same result as + * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}. + * @return true if table is a transactional table, false otherwise + */ + public static boolean isTransactionalTable(Table table) { + if (table == null) { + return false; + } + Map<String, String> parameters = table.getParameters(); + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); ++ return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); ++ } ++ ++ public static boolean isTransactionalTable(Map<String, String> parameters) { ++ if (parameters == null) { ++ return false; ++ } ++ String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + + /** + * Should produce the same result as + * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}. + */ + public static boolean isAcidTable(Table table) { + return TxnUtils.isTransactionalTable(table) && + TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY.equals(table.getParameters() + .get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES)); + } + + /** + * Should produce the result as <dbName>.<tableName>. + */ + public static String getFullTableName(String dbName, String tableName) { + return dbName.toLowerCase() + "." + tableName.toLowerCase(); + } + + public static String[] getDbTableName(String fullTableName) { + return fullTableName.split("\\."); + } + + + + /** + * Build a query (or queries if one query is too big but only for the case of 'IN' + * composite clause. For the case of 'NOT IN' clauses, multiple queries change + * the semantics of the intended query. + * E.g., Let's assume that input "inList" parameter has [5, 6] and that + * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause, + * Then having two delete statements changes the semantics of the inteneded SQL statement. + * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence + * is not equal to 'delete from T where a not in (5, 6)'.) + * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters. + * + * Note that this method currently support only single column for + * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and + * AND-based composite 'NOT IN' clause. + * For example, for 'IN' clause case, the method will build a query with OR. + * E.g., "id in (1,2,3) OR id in (4,5,6)". + * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND. + * + * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN' + * clauses in a query". + * + * @param queries OUT: Array of query strings + * @param prefix IN: Part of the query that comes before IN list + * @param suffix IN: Part of the query that comes after IN list + * @param inList IN: the list with IN list values + * @param inColumn IN: single column name of IN list operator + * @param addParens IN: add a pair of parenthesis outside the IN lists + * e.g. "(id in (1,2,3) OR id in (4,5,6))" + * @param notIn IN: is this for building a 'NOT IN' composite clause? + * @return OUT: a list of the count of IN list values that are in each of the corresponding queries + */ + public static List<Integer> buildQueryWithINClause(Configuration conf, + List<String> queries, + StringBuilder prefix, + StringBuilder suffix, + List<Long> inList, + String inColumn, + boolean addParens, + boolean notIn) { + List<String> inListStrings = new ArrayList<>(inList.size()); + for (Long aLong : inList) { + inListStrings.add(aLong.toString()); + } + return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, + inListStrings, inColumn, addParens, notIn); + + } + /** + * Build a query (or queries if one query is too big but only for the case of 'IN' + * composite clause. For the case of 'NOT IN' clauses, multiple queries change + * the semantics of the intended query. + * E.g., Let's assume that input "inList" parameter has [5, 6] and that + * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause, + * Then having two delete statements changes the semantics of the inteneded SQL statement. + * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence + * is not equal to 'delete from T where a not in (5, 6)'.) + * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters. + * + * Note that this method currently support only single column for + * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and + * AND-based composite 'NOT IN' clause. + * For example, for 'IN' clause case, the method will build a query with OR. + * E.g., "id in (1,2,3) OR id in (4,5,6)". + * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND. + * + * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN' + * clauses in a query". + * + * @param queries OUT: Array of query strings + * @param prefix IN: Part of the query that comes before IN list + * @param suffix IN: Part of the query that comes after IN list + * @param inList IN: the list with IN list values + * @param inColumn IN: single column name of IN list operator + * @param addParens IN: add a pair of parenthesis outside the IN lists + * e.g. "(id in (1,2,3) OR id in (4,5,6))" + * @param notIn IN: is this for building a 'NOT IN' composite clause? + * @return OUT: a list of the count of IN list values that are in each of the corresponding queries + */ + public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix, + StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) { + // Get configuration parameters + int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH); + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); + + // Check parameter set validity as a public method. + if (inList == null || inList.size() == 0 || maxQueryLength <= 0 || batchSize <= 0) { + throw new IllegalArgumentException("The IN list is empty!"); + } + + // Define constants and local variables. + int inListSize = inList.size(); + StringBuilder buf = new StringBuilder(); + + int cursor4InListArray = 0, // cursor for the "inList" array. + cursor4InClauseElements = 0, // cursor for an element list per an 'IN'/'NOT IN'-clause. + cursor4queryOfInClauses = 0; // cursor for in-clause lists per a query. + boolean nextItemNeeded = true; + boolean newInclausePrefixJustAppended = false; + StringBuilder nextValue = new StringBuilder(""); + StringBuilder newInclausePrefix = + new StringBuilder(notIn ? " and " + inColumn + " not in (": + " or " + inColumn + " in ("); + List<Integer> ret = new ArrayList<>(); + int currentCount = 0; + + // Loop over the given inList elements. + while( cursor4InListArray < inListSize || !nextItemNeeded) { + if (cursor4queryOfInClauses == 0) { + // Append prefix + buf.append(prefix); + if (addParens) { + buf.append("("); + } + buf.append(inColumn); + + if (notIn) { + buf.append(" not in ("); + } else { + buf.append(" in ("); + } + cursor4queryOfInClauses++; + newInclausePrefixJustAppended = false; + } + + // Get the next "inList" value element if needed. + if (nextItemNeeded) { + nextValue.setLength(0); + nextValue.append(String.valueOf(inList.get(cursor4InListArray++))); + nextItemNeeded = false; + } + + // Compute the size of a query when the 'nextValue' is added to the current query. + int querySize = querySizeExpected(buf.length(), nextValue.length(), suffix.length(), addParens); + + if (querySize > maxQueryLength * 1024) { + // Check an edge case where the DIRECT_SQL_MAX_QUERY_LENGTH does not allow one 'IN' clause with single value. + if (cursor4queryOfInClauses == 1 && cursor4InClauseElements == 0) { + throw new IllegalArgumentException("The current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + " is set too small to have one IN clause with single value!"); + } + + // Check en edge case to throw Exception if we can not build a single query for 'NOT IN' clause cases as mentioned at the method comments. + if (notIn) { + throw new IllegalArgumentException("The NOT IN list has too many elements for the current " + ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH.getVarname() + "!"); + } + + // Wrap up the current query string since we can not add another "inList" element value. + if (newInclausePrefixJustAppended) { + buf.delete(buf.length()-newInclausePrefix.length(), buf.length()); + } + + buf.setCharAt(buf.length() - 1, ')'); // replace the "commar" to finish a 'IN' clause string. + + if (addParens) { + buf.append(")"); + } + + buf.append(suffix); + queries.add(buf.toString()); + ret.add(currentCount); + + // Prepare a new query string. + buf.setLength(0); + currentCount = 0; + cursor4queryOfInClauses = cursor4InClauseElements = 0; + querySize = 0; + newInclausePrefixJustAppended = false; + continue; + } else if (cursor4InClauseElements >= batchSize-1 && cursor4InClauseElements != 0) { + // Finish the current 'IN'/'NOT IN' clause and start a new clause. + buf.setCharAt(buf.length() - 1, ')'); // replace the "commar". + buf.append(newInclausePrefix.toString()); + + newInclausePrefixJustAppended = true; + + // increment cursor for per-query IN-clause list + cursor4queryOfInClauses++; + cursor4InClauseElements = 0; + } else { + buf.append(nextValue.toString()).append(","); + currentCount++; + nextItemNeeded = true; + newInclausePrefixJustAppended = false; + // increment cursor for elements per 'IN'/'NOT IN' clause. + cursor4InClauseElements++; + } + } + + // Finish the last query. + if (newInclausePrefixJustAppended) { + buf.delete(buf.length()-newInclausePrefix.length(), buf.length()); + } + buf.setCharAt(buf.length() - 1, ')'); // replace the commar. + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + ret.add(currentCount); + return ret; + } + + /** + * Compute and return the size of a query statement with the given parameters as input variables. + * + * @param sizeSoFar size of the current contents of the buf + * @param sizeNextItem size of the next 'IN' clause element value. + * @param suffixSize size of the suffix for a quey statement + * @param addParens Do we add an additional parenthesis? + */ + private static int querySizeExpected(int sizeSoFar, + int sizeNextItem, + int suffixSize, + boolean addParens) { + + int size = sizeSoFar + sizeNextItem + suffixSize; + + if (addParens) { + size++; + } + + return size; + } + }
