Implementing the TAL for Tephra
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3431902f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3431902f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3431902f Branch: refs/heads/omid Commit: 3431902fd83c590381acb9a817b05676e588004e Parents: 96f8d09 Author: Ohad Shacham <oh...@yahoo-inc.com> Authored: Tue Mar 7 12:03:50 2017 +0200 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Thu Mar 9 13:29:37 2017 -0800 ---------------------------------------------------------------------- .../transaction/OmidTransactionContext.java | 77 +++++ .../transaction/OmidTransactionTable.java | 323 +++++++++++++++++++ .../transaction/PhoenixTransactionContext.java | 12 - .../transaction/PhoenixTransactionalTable.java | 5 - .../transaction/TephraTransactionContext.java | 285 ++++++++++++++++ .../transaction/TephraTransactionTable.java | 303 +++++++++++++++++ 6 files changed, 988 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java new file mode 100644 index 0000000..937ac14 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -0,0 +1,77 @@ +package org.apache.phoenix.transaction; + +import java.sql.SQLException; +import java.util.concurrent.TimeoutException; + +import org.apache.phoenix.schema.PTable; + +public class OmidTransactionContext implements PhoenixTransactionContext { + + @Override + public void begin() throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void commit() throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void abort() throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void checkpoint(boolean hasUncommittedData) throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void commitDDLFence(PTable dataTable) throws SQLException, + InterruptedException, TimeoutException { + // TODO Auto-generated method stub + + } + + @Override + public void markDMLFence(PTable table) { + // TODO Auto-generated method stub + + } + + @Override + public void join(PhoenixTransactionContext ctx) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isTransactionRunning() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void reset() { + // TODO Auto-generated method stub + + } + + @Override + public long getTransactionId() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getReadPointer() { + // TODO Auto-generated method stub + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java new file mode 100644 index 0000000..d2cd020 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -0,0 +1,323 @@ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class OmidTransactionTable implements PhoenixTransactionalTable { + + public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { + // TODO Auto-generated constructor stub + } + + @Override + public Result get(Get get) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void put(Put put) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void delete(Delete delete) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] getTableName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Configuration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean exists(Get get) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void put(List<Put> puts) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void setAutoFlush(boolean autoFlush) { + // TODO Auto-generated method stub + } + + @Override + public boolean isAutoFlush() { + // TODO Auto-generated method stub + return false; + } + + @Override + public long getWriteBufferSize() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void flushCommits() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + // TODO Auto-generated method stub + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + // TODO Auto-generated method stub + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public TableName getName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public Result append(Append append) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result increment(Increment increment) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + // TODO Auto-generated method stub + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + // TODO Auto-generated method stub + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + // TODO Auto-generated method stub + return false; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index f07640e..af0ff05 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -29,18 +29,6 @@ public interface PhoenixTransactionContext { public void abort() throws SQLException; /** - * Rollback a transaction - * - * @param e - * @throws SQLException - */ - public void abort(SQLException e) throws SQLException; - - /** - * Create a checkpoint in a transaction as defined in [TEPHRA-96] - * @throws SQLException - */ - public void checkpoint() throws SQLException; /** * Commit DDL to guarantee that no transaction started before create index http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java index 7495c5b..dcab73d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java @@ -101,11 +101,6 @@ public interface PhoenixTransactionalTable extends HTableInterface { public void delete(List<Delete> deletes) throws IOException; /** - * Return the underling htable - */ - public HTableInterface getHTable(); - - /** * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)} */ public void setAutoFlush(boolean autoFlush); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java new file mode 100644 index 0000000..8fc5e0f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -0,0 +1,285 @@ +package org.apache.phoenix.transaction; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; +import org.apache.tephra.TransactionConflictException; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.Transaction.VisibilityLevel; +import org.apache.tephra.visibility.FenceWait; +import org.apache.tephra.visibility.VisibilityFence; + +import com.google.common.collect.Lists; + +public class TephraTransactionContext implements PhoenixTransactionContext { + + private final List<TransactionAware> txAwares; + private final TransactionContext txContext; + private Transaction tx; + private TransactionSystemClient txServiceClient; + private TransactionFailureException e; + + public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean threadSafe) { + + this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); + + assert(ctx instanceof TephraTransactionContext); + TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; + + if (threadSafe) { + this.tx = tephraTransactionContext.getTransaction(); + this.txAwares = Lists.newArrayList(); + this.txContext = null; + } else { + this.txAwares = Collections.emptyList(); + if (ctx == null) { + this.txContext = new TransactionContext(txServiceClient); + } else { + this.txContext = tephraTransactionContext.getContext(); + } + } + + this.e = null; + } + + @Override + public void begin() throws SQLException { + if (txContext == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); + } + + try { + txContext.start(); + } catch (TransactionFailureException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } + } + + @Override + public void commit() throws SQLException { + try { + assert(txContext != null); + txContext.finish(); + } catch (TransactionFailureException e) { + this.e = e; + if (e instanceof TransactionConflictException) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } + } + + @Override + public void abort() throws SQLException { + try { + if (e != null) { + txContext.abort(e); + e = null; + } else { + txContext.abort(); + } + } catch (TransactionFailureException e) { + this.e = null; + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()) + .setRootCause(e) + .build().buildException(); + } + } + + @Override + public void checkpoint(boolean hasUncommittedData) throws SQLException { + if (hasUncommittedData) { + try { + if (txContext == null) { + tx = txServiceClient.checkpoint(tx); + } else { + assert(txContext != null); + txContext.checkpoint(); + tx = txContext.getCurrentTransaction(); + } + } catch (TransactionFailureException e) { + throw new SQLException(e); + } + } + + if (txContext == null) { + tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } + else { + assert(txContext != null); + txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } + } + + @Override + public void commitDDLFence(PTable dataTable) throws SQLException, + InterruptedException, TimeoutException { + byte[] key = dataTable.getName().getBytes(); + try { + FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient); + fenceWait.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + } catch (TimeoutException | TransactionFailureException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) + .setSchemaName(dataTable.getSchemaName().getString()) + .setTableName(dataTable.getTableName().getString()) + .build().buildException(); + } + } + + @Override + public void markDMLFence(PTable table) { + byte[] logicalKey = table.getName().getBytes(); + TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); + if (this.txContext == null) { + this.txAwares.add(logicalTxAware); + } else { + this.txContext.addTransactionAware(logicalTxAware); + } + byte[] physicalKey = table.getPhysicalName().getBytes(); + if (Bytes.compareTo(physicalKey, logicalKey) != 0) { + TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); + if (this.txContext == null) { + this.txAwares.add(physicalTxAware); + } else { + this.txContext.addTransactionAware(physicalTxAware); + } + } + } + + @Override + public void join(PhoenixTransactionContext ctx) { + assert(ctx instanceof TephraTransactionContext); + TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; + + tephraContext.getAwares(); + + if (txContext != null) { + for (TransactionAware txAware : tephraContext.getAwares()) { + txContext.addTransactionAware(txAware); + } + } else { + txAwares.addAll(tephraContext.getAwares()); + } + } + + @Override + public boolean isTransactionRunning() { + if (this.txContext != null) { + return (this.txContext.getCurrentTransaction() != null); + } + + if (this.tx != null) { + return true; + } + + return false; + } + + @Override + public void reset() { + tx = null; + txAwares.clear(); + } + + @Override + public long getTransactionId() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getTransactionId(); + } + + if (tx != null) { + return tx.getTransactionId(); + } + + return HConstants.LATEST_TIMESTAMP; + } + + @Override + public long getReadPointer() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getReadPointer(); + } + + if (tx != null) { + return tx.getReadPointer(); + } + + return (-1); + } + + /** + * TephraTransactionContext specific functions + */ + + Transaction getTransaction() { + return this.tx; + } + + TransactionContext getContext() { + return this.txContext; + } + + List<TransactionAware> getAwares() { + return txAwares; + } + + void addTransactionAware(TransactionAware txAware) { + if (this.txContext != null) { + txContext.addTransactionAware(txAware); + } else if (this.tx != null) { + txAwares.add(txAware); + } + } + + // For testing + public long getWritePointer() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getWritePointer(); + } + + if (tx != null) { + return tx.getWritePointer(); + } + + return HConstants.LATEST_TIMESTAMP; + } + + // For testing + public VisibilityLevel getVisibilityLevel() { + if (this.txContext != null) { + return txContext.getCurrentTransaction().getVisibilityLevel(); + } + + if (tx != null) { + return tx.getVisibilityLevel(); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3431902f/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java new file mode 100644 index 0000000..50ea600 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java @@ -0,0 +1,303 @@ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.tephra.hbase.TransactionAwareHTable; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class TephraTransactionTable implements PhoenixTransactionalTable { + + private TransactionAwareHTable transactionAwareHTable; + + private TephraTransactionContext tephraTransactionContext; + + public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { + + assert(ctx instanceof TephraTransactionContext); + + tephraTransactionContext = (TephraTransactionContext) ctx; + + transactionAwareHTable = new TransactionAwareHTable(hTable); + + tephraTransactionContext.addTransactionAware(transactionAwareHTable); + } + + @Override + public Result get(Get get) throws IOException { + return transactionAwareHTable.get(get); + } + + @Override + public void put(Put put) throws IOException { + transactionAwareHTable.put(put); + } + + @Override + public void delete(Delete delete) throws IOException { + transactionAwareHTable.delete(delete); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return transactionAwareHTable.getScanner(scan); + } + + @Override + public byte[] getTableName() { + return transactionAwareHTable.getTableName(); + } + + @Override + public Configuration getConfiguration() { + return transactionAwareHTable.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return transactionAwareHTable.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return transactionAwareHTable.exists(get); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + return transactionAwareHTable.get(gets); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return transactionAwareHTable.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return transactionAwareHTable.getScanner(family, qualifier); + } + + @Override + public void put(List<Put> puts) throws IOException { + transactionAwareHTable.put(puts); + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + transactionAwareHTable.delete(deletes); + } + + @Override + public void setAutoFlush(boolean autoFlush) { + transactionAwareHTable.setAutoFlush(autoFlush); + } + + @Override + public boolean isAutoFlush() { + return transactionAwareHTable.isAutoFlush(); + } + + @Override + public long getWriteBufferSize() { + return transactionAwareHTable.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + transactionAwareHTable.setWriteBufferSize(writeBufferSize); + } + + @Override + public void flushCommits() throws IOException { + transactionAwareHTable.flushCommits(); + } + + @Override + public void close() throws IOException { + transactionAwareHTable.close(); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return transactionAwareHTable.exists(gets); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + transactionAwareHTable.setAutoFlush(autoFlush); + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return transactionAwareHTable.getRowOrBefore(row, family); + } + + @Override + public TableName getName() { + return transactionAwareHTable.getName(); + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + return transactionAwareHTable.existsAll(gets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + transactionAwareHTable.batch(actions, results); + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + return transactionAwareHTable.batch(actions); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + transactionAwareHTable.batchCallback(actions, results, callback); + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + return transactionAwareHTable.batchCallback(actions, callback); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + transactionAwareHTable.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return transactionAwareHTable.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return transactionAwareHTable.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return transactionAwareHTable.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } + +}