http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java deleted file mode 100644 index 55785be..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTableType; - -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - -public class TephraTransactionTable implements PhoenixTransactionalTable { - - private TransactionAwareHTable transactionAwareHTable; - - private TephraTransactionContext tephraTransactionContext; - - public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { - this(ctx, hTable, null); - } - - public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) { - - assert(ctx instanceof TephraTransactionContext); - - tephraTransactionContext = (TephraTransactionContext) ctx; - - transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); - - tephraTransactionContext.addTransactionAware(transactionAwareHTable); - - if (pTable != null && pTable.getType() != PTableType.INDEX) { - tephraTransactionContext.markDMLFence(pTable); - } - } - - @Override - public Result get(Get get) throws IOException { - return transactionAwareHTable.get(get); - } - - @Override - public void put(Put put) throws IOException { - transactionAwareHTable.put(put); - } - - @Override - public void delete(Delete delete) throws IOException { - transactionAwareHTable.delete(delete); - } - - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - return transactionAwareHTable.getScanner(scan); - } - - @Override - public byte[] getTableName() { - return transactionAwareHTable.getTableName(); - } - - @Override - public Configuration getConfiguration() { - return transactionAwareHTable.getConfiguration(); - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - return transactionAwareHTable.getTableDescriptor(); - } - - @Override - public boolean exists(Get get) throws IOException { - return transactionAwareHTable.exists(get); - } - - @Override - public Result[] get(List<Get> gets) throws IOException { - return transactionAwareHTable.get(gets); - } - - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - return transactionAwareHTable.getScanner(family); - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) - throws IOException { - return transactionAwareHTable.getScanner(family, qualifier); - } - - @Override - public void put(List<Put> puts) throws IOException { - transactionAwareHTable.put(puts); - } - - @Override - public void delete(List<Delete> deletes) throws IOException { - transactionAwareHTable.delete(deletes); - } - - @Override - public void setAutoFlush(boolean autoFlush) { - transactionAwareHTable.setAutoFlush(autoFlush); - } - - @Override - public boolean isAutoFlush() { - return transactionAwareHTable.isAutoFlush(); - } - - @Override - public long getWriteBufferSize() { - return transactionAwareHTable.getWriteBufferSize(); - } - - @Override - public void setWriteBufferSize(long writeBufferSize) throws IOException { - transactionAwareHTable.setWriteBufferSize(writeBufferSize); - } - - @Override - public void flushCommits() throws IOException { - transactionAwareHTable.flushCommits(); - } - - @Override - public void close() throws IOException { - transactionAwareHTable.close(); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - } - - @Override - public Boolean[] exists(List<Get> gets) throws IOException { - return transactionAwareHTable.exists(gets); - } - - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail); - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - transactionAwareHTable.setAutoFlush(autoFlush); - } - - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - return transactionAwareHTable.getRowOrBefore(row, family); - } - - @Override - public TableName getName() { - return transactionAwareHTable.getName(); - } - - @Override - public boolean[] existsAll(List<Get> gets) throws IOException { - return transactionAwareHTable.existsAll(gets); - } - - @Override - public void batch(List<? extends Row> actions, Object[] results) - throws IOException, InterruptedException { - transactionAwareHTable.batch(actions, results); - } - - @Override - public Object[] batch(List<? extends Row> actions) throws IOException, - InterruptedException { - return transactionAwareHTable.batch(actions); - } - - @Override - public <R> void batchCallback(List<? extends Row> actions, - Object[] results, Callback<R> callback) throws IOException, - InterruptedException { - transactionAwareHTable.batchCallback(actions, results, callback); - } - - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, - Callback<R> callback) throws IOException, InterruptedException { - return transactionAwareHTable.batchCallback(actions, callback); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) throws IOException { - return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put) throws IOException { - return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) throws IOException { - return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete) - throws IOException { - return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete); - } - - @Override - public void mutateRow(RowMutations rm) throws IOException { - transactionAwareHTable.mutateRow(rm); - } - - @Override - public Result append(Append append) throws IOException { - return transactionAwareHTable.append(append); - } - - @Override - public Result increment(Increment increment) throws IOException { - return transactionAwareHTable.increment(increment); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount) throws IOException { - return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount, Durability durability) - throws IOException { - return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability); - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - return transactionAwareHTable.coprocessorService(row); - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService( - Class<T> service, byte[] startKey, byte[] endKey, - Call<T, R> callable) throws ServiceException, Throwable { - return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable); - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, - byte[] startKey, byte[] endKey, Call<T, R> callable, - Callback<R> callback) throws ServiceException, Throwable { - transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback); - } - - @Override - public <R extends Message> Map<byte[], R> batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype) - throws ServiceException, Throwable { - return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); - } - - @Override - public <R extends Message> void batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype, - Callback<R> callback) throws ServiceException, Throwable { - transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation) - throws IOException { - return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation); - } - - @Override - public int getOperationTimeout() { - return 0; - } - - @Override - public void setOperationTimeout(int i) { - } - - @Override - public int getRpcTimeout() { - return 0; - } - - @Override - public void setRpcTimeout(int i) { - } - - @Override - public int getReadRpcTimeout() { - return 0; - } - - @Override - public void setReadRpcTimeout(int readRpcTimeout) { - } - - @Override - public int getWriteRpcTimeout() { - return 0; - } - - @Override - public void setWriteRpcTimeout(int writeRpcTimeout) { - } -}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java index ea2822b..67c4bc1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -17,25 +17,56 @@ */ package org.apache.phoenix.transaction; +import java.io.IOException; + +import org.apache.phoenix.coprocessor.MetaDataProtocol; + + public class TransactionFactory { - enum TransactionProcessor { - Tephra, - Omid + public enum Provider { + TEPHRA((byte)1, TephraTransactionProvider.getInstance()), + OMID((byte)2, OmidTransactionProvider.getInstance()); + + private final byte code; + private final PhoenixTransactionProvider provider; + + Provider(byte code, PhoenixTransactionProvider provider) { + this.code = code; + this.provider = provider; + } + + public byte getCode() { + return this.code; + } + + public static Provider fromCode(int code) { + if (code < 1 || code > Provider.values().length) { + throw new IllegalArgumentException("Invalid TransactionFactory.Provider " + code); + } + return Provider.values()[code-1]; + } + + public static Provider getDefault() { + return TEPHRA; + } + + public PhoenixTransactionProvider getTransactionProvider() { + return provider; + } } - static public TransactionProvider getTransactionProvider() { - return TephraTransactionProvider.getInstance(); + public static PhoenixTransactionProvider getTransactionProvider(Provider provider) { + return provider.getTransactionProvider(); } - static public TransactionProvider getTransactionProvider(TransactionProcessor processor) { - switch (processor) { - case Tephra: - return TephraTransactionProvider.getInstance(); - case Omid: - return OmidTransactionProvider.getInstance(); - default: - throw new IllegalArgumentException("Unknown transaction processor: " + processor); + public static PhoenixTransactionContext getTransactionContext(byte[] txState, int clientVersion) throws IOException { + if (txState == null || txState.length == 0) { + return null; } + Provider provider = (clientVersion < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) + ? Provider.OMID + : Provider.fromCode(txState[txState.length-1]); + return provider.getTransactionProvider().getTransactionContext(txState); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java deleted file mode 100644 index a5704f1..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import java.io.IOException; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.phoenix.jdbc.PhoenixConnection; - -public interface TransactionProvider { - public PhoenixTransactionContext getTransactionContext(); - public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException; - public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection); - public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask); - - public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable); - - public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp); - public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp); -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 1c25c33..6cf6e56 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -88,7 +88,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.transaction.TransactionFactory; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -1515,7 +1514,7 @@ public class PhoenixRuntime { * @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp. */ public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) { - return TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); + return TransactionUtil.isTransactionalTimestamp(tsOfCell) ? TransactionUtil.convertToMilliseconds(tsOfCell) : tsOfCell; } public static long getCurrentScn(ReadOnlyProps props) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index dd885fd..996e1dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -59,6 +59,7 @@ import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; @@ -88,6 +89,8 @@ import com.google.common.collect.Lists; */ public class ScanUtil { public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1]; + public static final int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0); + /* * Max length that we fill our key when we turn an inclusive key * into a exclusive key. @@ -930,5 +933,17 @@ public class ScanUtil { public static boolean isIndexRebuild(Scan scan) { return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null; } + + public static int getClientVersion(Scan scan) { + int clientVersion = UNKNOWN_CLIENT_VERSION; + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + if (clientVersionBytes != null) { + clientVersion = Bytes.toInt(clientVersionBytes); + } + return clientVersion; + } + public static void setClientVersion(Scan scan, int version) { + scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(version)); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 8cedb1c..dee02d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -25,42 +25,57 @@ import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.transaction.PhoenixTransactionContext; -import org.apache.phoenix.transaction.PhoenixTransactionalTable; -import org.apache.phoenix.transaction.TephraTransactionTable; import org.apache.phoenix.transaction.TransactionFactory; -import org.apache.tephra.util.TxUtils; public class TransactionUtil { + // All transaction providers must use an empty byte array as the family delete marker + // (see TxConstants.FAMILY_DELETE_QUALIFIER) + public static final byte[] FAMILY_DELETE_MARKER = HConstants.EMPTY_BYTE_ARRAY; + // All transaction providers must multiply timestamps by this constant. + // (see TxConstants.MAX_TX_PER_MS) + public static final int MAX_TRANSACTIONS_PER_MILLISECOND = 1000000; + // Constant used to empirically determine if a timestamp is a transactional or + // non transactional timestamp (see TxUtils.MAX_NON_TX_TIMESTAMP) + private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1); + private TransactionUtil() { + } public static boolean isTransactionalTimestamp(long ts) { - return !TxUtils.isPreExistingVersion(ts); + return ts >= MAX_NON_TX_TIMESTAMP; } public static boolean isDelete(Cell cell) { - return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY)); + return CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); } - public static long convertToNanoseconds(long serverTimeStamp) { - return serverTimeStamp * TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond(); + public static boolean isDeleteFamily(Cell cell) { + return CellUtil.matchingQualifier(cell, FAMILY_DELETE_MARKER) && CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); } - public static long convertToMilliseconds(long serverTimeStamp) { - return serverTimeStamp / TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond(); + private static Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) { + return CellUtil.createCell(row, family, FAMILY_DELETE_MARKER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); } - public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) { - return new TephraTransactionTable(phoenixTransactionContext, htable, pTable); + private static Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) { + return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + } + + public static long convertToNanoseconds(long serverTimeStamp) { + return serverTimeStamp * MAX_TRANSACTIONS_PER_MILLISECOND; + } + + public static long convertToMilliseconds(long serverTimeStamp) { + return serverTimeStamp / MAX_TRANSACTIONS_PER_MILLISECOND; } // we resolve transactional tables at the txn read pointer @@ -83,14 +98,14 @@ public class TransactionUtil { return txInProgress ? convertToMilliseconds(mutationState.getInitialWritePointer()) : result.getMutationTime(); } - public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional) throws SQLException { + public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional, TransactionFactory.Provider provider) throws SQLException { Long timestamp = null; if (!transactional) { return timestamp; } MutationState mutationState = connection.getMutationState(); if (!mutationState.isTransactionStarted()) { - mutationState.startTransaction(); + mutationState.startTransaction(provider); } timestamp = convertToMilliseconds(mutationState.getInitialWritePointer()); return timestamp; @@ -108,7 +123,7 @@ public class TransactionUtil { if (deleteMarker == null) { deleteMarker = new Put(mutation.getRow()); } - deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteFamilyMarker( + deleteMarker.add(newDeleteFamilyMarker( deleteMarker.getRow(), family, familyCells.get(0).getTimestamp())); @@ -119,7 +134,7 @@ public class TransactionUtil { if (deleteMarker == null) { deleteMarker = new Put(mutation.getRow()); } - deleteMarker.add(TransactionFactory.getTransactionProvider().newDeleteColumnMarker( + deleteMarker.add(newDeleteColumnMarker( deleteMarker.getRow(), family, CellUtil.cloneQualifier(cell), http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java index 76757b0..d88a915 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java @@ -56,7 +56,6 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; @@ -64,8 +63,8 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.EncodedCQCounter; -import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; @@ -262,7 +261,7 @@ public class CorrelatePlanTest { PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable>emptyList(), false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, true, false, 0, 0L, Boolean.FALSE, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true); + null, null, true, null, 0, 0L, Boolean.FALSE, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true); TableRef sourceTable = new TableRef(pTable); List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); for (PColumn column : sourceTable.getTable().getColumns()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java index 1a7132c..017e6c8 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/LiteralResultIteratorPlanTest.java @@ -50,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; @@ -58,11 +57,11 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.EncodedCQCounter; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.junit.Test; @@ -183,7 +182,7 @@ public class LiteralResultIteratorPlanTest { PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false, - false, null, null, null, true, false, 0, 0L, false, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true); + false, null, null, null, true, null, 0, 0L, false, null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, EncodedCQCounter.NULL_COUNTER, true); TableRef sourceTable = new TableRef(pTable); List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); for (PColumn column : sourceTable.getTable().getColumns()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 580becb..0ea63e7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -130,7 +130,6 @@ import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ConfigUtil; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -168,7 +167,6 @@ public abstract class BaseTest { private static final Map<String,String> tableDDLMap; private static final Logger logger = LoggerFactory.getLogger(BaseTest.class); - protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30; @ClassRule public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static final int dropTableTimeout = 300; // 5 mins should be long enough. @@ -414,18 +412,6 @@ public abstract class BaseTest { return url; } - private static void tearDownTxManager() throws SQLException { - TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager(); - } - - protected static void setTxnConfigs() throws IOException { - TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS); - } - - protected static void setupTxManager() throws SQLException, IOException { - TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config, getUrl()); - } - private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception { if (!clusterInitialized) { url = setUpTestCluster(config, serverProps); @@ -434,10 +420,6 @@ public abstract class BaseTest { return url; } - private static void checkTxManagerInitialized(ReadOnlyProps clientProps) throws SQLException, IOException { - setupTxManager(); - } - /** * Set up the test hbase cluster. * @return url to be used by clients to connect to the cluster. @@ -476,11 +458,6 @@ public abstract class BaseTest { final HBaseTestingUtility u = utility; try { destroyDriver(); - try { - tearDownTxManager(); - } catch (Throwable t) { - logger.error("Exception caught when shutting down tx manager", t); - } utility = null; clusterInitialized = false; } finally { @@ -519,9 +496,7 @@ public abstract class BaseTest { protected static void setUpTestDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception { if (driver == null) { - setTxnConfigs(); String url = checkClusterInitialized(serverProps); - checkTxManagerInitialized(serverProps); driver = initAndRegisterTestDriver(url, clientProps); } } @@ -593,6 +568,7 @@ public abstract class BaseTest { conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY); conf.setLong(HConstants.ZK_SESSION_TIMEOUT, 10 * HConstants.DEFAULT_ZK_SESSION_TIMEOUT); conf.setLong(HConstants.ZOOKEEPER_TICK_TIME, 6 * 1000); + // override any defaults based on overrideProps for (Entry<String,String> entry : overrideProps) { conf.set(entry.getKey(), entry.getValue()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index c93e56e..a7569f7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -20,9 +20,12 @@ package org.apache.phoenix.query; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY; import static org.apache.phoenix.query.QueryServicesOptions.withDefaults; +import org.apache.curator.shaded.com.google.common.io.Files; import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.tephra.TxConstants; +import org.apache.twill.internal.utils.Networks; /** @@ -69,6 +72,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { * because we want to control it's execution ourselves */ public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = Long.MAX_VALUE; + public static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30; /** @@ -117,7 +121,16 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setHConnectionPoolMaxSize(DEFAULT_HCONNECTION_POOL_MAX_SIZE) .setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS) .setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME) - .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY); + .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY) + // setup default configs for Tephra + .set(TxConstants.Manager.CFG_DO_PERSIST, false) + .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times") + .set(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1) + .set(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()) + .set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, Files.createTempDir().getAbsolutePath()) + .set(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS) + .set(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L) + ; } public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 1ec07b6..a06fd69 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -780,7 +780,7 @@ public class TestUtil { ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); MutationState mutationState = pconn.getMutationState(); if (table.isTransactional()) { - mutationState.startTransaction(); + mutationState.startTransaction(table.getTransactionProvider()); } try (HTableInterface htable = mutationState.getHTable(table)) { byte[] markerRowKey = Bytes.toBytes("TO_DELETE"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/de83b8d5/phoenix-protocol/src/main/PTable.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto index ba9e0b4..16381dd 100644 --- a/phoenix-protocol/src/main/PTable.proto +++ b/phoenix-protocol/src/main/PTable.proto @@ -100,6 +100,7 @@ message PTable { optional bytes encodingScheme = 35; repeated EncodedCQCounter encodedCQCounters = 36; optional bool useStatsForParallelization = 37; + optional int32 transactionProvider = 38; } message EncodedCQCounter {