PHOENIX-4660 Use TransactionProvider interface
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af0f68b9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af0f68b9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af0f68b9 Branch: refs/heads/4.x-cdh5.13 Commit: af0f68b919087553f9356142c223829ae5cf1bff Parents: c1827f2 Author: James Taylor <jtay...@salesforce.com> Authored: Sat Mar 17 22:16:24 2018 +0000 Committer: Pedro Boado <pbo...@apache.org> Committed: Fri Mar 23 21:31:53 2018 +0000 ---------------------------------------------------------------------- .../phoenix/tx/FlappingTransactionIT.java | 8 +- .../PhoenixTransactionalProcessor.java | 2 +- .../apache/phoenix/execute/MutationState.java | 8 +- .../PhoenixTxIndexMutationGenerator.java | 2 +- .../apache/phoenix/index/IndexMaintainer.java | 2 +- .../index/IndexMetaDataCacheFactory.java | 2 +- .../index/PhoenixIndexMetaDataBuilder.java | 2 +- .../query/ConnectionQueryServicesImpl.java | 2 +- .../query/ConnectionlessQueryServicesImpl.java | 2 +- .../transaction/OmidTransactionProvider.java | 78 +++++++++++++ .../transaction/TephraTransactionProvider.java | 76 ++++++++++++ .../phoenix/transaction/TransactionFactory.java | 117 ++----------------- .../transaction/TransactionProvider.java | 36 ++++++ .../org/apache/phoenix/util/PhoenixRuntime.java | 2 +- .../apache/phoenix/util/TransactionUtil.java | 4 +- .../java/org/apache/phoenix/query/BaseTest.java | 6 +- 16 files changed, 218 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java index 301768b..200cf1c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java @@ -225,9 +225,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { } PhoenixTransactionContext txContext = - TransactionFactory.getTransactionFactory().getTransactionContext(pconn); + TransactionFactory.getTransactionProvider().getTransactionContext(pconn); PhoenixTransactionalTable txTable = - TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); + TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable); txContext.begin(); @@ -277,9 +277,9 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT { // Repeat the same as above, but this time abort the transaction txContext = - TransactionFactory.getTransactionFactory().getTransactionContext(pconn); + TransactionFactory.getTransactionProvider().getTransactionContext(pconn); txTable = - TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable); + TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, htable); txContext.begin(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java index ca0c997..0c26ecc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java @@ -22,7 +22,7 @@ import org.apache.phoenix.transaction.TransactionFactory; public class PhoenixTransactionalProcessor extends DelegateRegionObserver { public PhoenixTransactionalProcessor() { - super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoprocessor()); + super(TransactionFactory.getTransactionProvider().getTransactionContext().getCoprocessor()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 39cb7a5..727b424 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -183,15 +183,15 @@ public class MutationState implements SQLCloseable { : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; if (!subTask) { if (txContext == null) { - phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection); + phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(connection); } else { isExternalTxContext = true; - phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); + phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask); } } else { // this code path is only used while running child scans, we can't pass the txContext to child scans // as it is not thread safe, so we use the tx member variable - phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); + phoenixTransactionContext = TransactionFactory.getTransactionProvider().getTransactionContext(txContext, connection, subTask); } } @@ -1224,7 +1224,7 @@ public class MutationState implements SQLCloseable { } public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) throws IOException { - return TransactionFactory.getTransactionFactory().getTransactionContext(txnBytes); + return TransactionFactory.getTransactionProvider().getTransactionContext(txnBytes); } private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index b5031af..7d6154e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -181,7 +181,7 @@ public class PhoenixTxIndexMutationGenerator { scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); scanRanges.initializeScan(scan); - PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(indexMetaData.getTransactionContext(), htable); + PhoenixTransactionalTable txTable = TransactionFactory.getTransactionProvider().getTransactionalTable(indexMetaData.getTransactionContext(), htable); // For rollback, we need to see all versions, including // the last committed version as there may be multiple // checkpointed versions. http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 9042557..15d8ac3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -1068,7 +1068,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { + || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionProvider().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { nDeleteCF++; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 03db767..94fbd0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -52,7 +52,7 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer); final PhoenixTransactionContext txnContext; try { - txnContext = txState.length != 0 ? TransactionFactory.getTransactionFactory().getTransactionContext(txState) : null; + txnContext = txState.length != 0 ? TransactionFactory.getTransactionProvider().getTransactionContext(txState) : null; } catch (IOException e) { throw new SQLException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java index c954cf0..5e6f756 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java @@ -63,7 +63,7 @@ public class PhoenixIndexMetaDataBuilder { boolean useProto = md != null; byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); - final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState); + final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionProvider().getTransactionContext(txState); byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION); final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); return new IndexMetaDataCache() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 1899e37..eff406d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -400,7 +400,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } private void initTxServiceClient() { - txZKClientService = TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config, props, connectionInfo); + txZKClientService = TransactionFactory.getTransactionProvider().getTransactionContext().setTransactionClient(config, props, connectionInfo); } private void openConnection() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index d25299a..c510b5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -136,7 +136,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // Without making a copy of the configuration we cons up, we lose some of our properties // on the server side during testing. this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); - TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config); + TransactionFactory.getTransactionProvider().getTransactionContext().setInMemoryTransactionClient(config); this.guidePostsCache = new GuidePostsCache(this, config); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java new file mode 100644 index 0000000..b0c1bfe --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; + +public class OmidTransactionProvider implements TransactionProvider { + private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider(); + + public static final OmidTransactionProvider getInstance() { + return INSTANCE; + } + + private OmidTransactionProvider() { + } + + @Override + public PhoenixTransactionContext getTransactionContext() { + return new OmidTransactionContext(); + } + + @Override + public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { + //return new OmidTransactionContext(txnBytes); + return null; + } + + @Override + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { + //return new OmidTransactionContext(connection); + return null; + } + + @Override + public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { + //return new OmidTransactionContext(contex, connection, subTask); + return null; + } + + @Override + public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { + //return new OmidTransactionTable(ctx, htable); + return null; + } + + @Override + public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) { + return CellUtil.createCell(row, family, HConstants.EMPTY_BYTE_ARRAY, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + } + + @Override + public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) { + return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java new file mode 100644 index 0000000..795be9f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.tephra.TxConstants; + +public class TephraTransactionProvider implements TransactionProvider { + private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider(); + + public static final TephraTransactionProvider getInstance() { + return INSTANCE; + } + + private TephraTransactionProvider() { + } + + + @Override + public PhoenixTransactionContext getTransactionContext() { + return new TephraTransactionContext(); + } + + @Override + public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { + return new TephraTransactionContext(txnBytes); + } + + @Override + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { + return new TephraTransactionContext(connection); + } + + @Override + public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { + return new TephraTransactionContext(contex, connection, subTask); + } + + @Override + public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { + return new TephraTransactionTable(ctx, htable); + } + + @Override + public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) { + return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + } + + @Override + public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) { + return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java index 8b3fc1d..f32764b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -17,127 +17,24 @@ */ package org.apache.phoenix.transaction; -import java.io.IOException; - -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.phoenix.jdbc.PhoenixConnection; - public class TransactionFactory { - - static private TransactionFactory transactionFactory = null; - - private TransactionProcessor tp = TransactionProcessor.Tephra; - enum TransactionProcessor { Tephra, Omid } - private TransactionFactory(TransactionProcessor tp) { - this.tp = tp; - } - - static public void createTransactionFactory(TransactionProcessor tp) { - if (transactionFactory == null) { - transactionFactory = new TransactionFactory(tp); - } - } - - static public TransactionFactory getTransactionFactory() { - if (transactionFactory == null) { - createTransactionFactory(TransactionProcessor.Tephra); - } - - return transactionFactory; - } - - public PhoenixTransactionContext getTransactionContext() { - - PhoenixTransactionContext ctx = null; - - switch(tp) { - case Tephra: - ctx = new TephraTransactionContext(); - break; - case Omid: - ctx = new OmidTransactionContext(); - break; - default: - ctx = null; - } - - return ctx; - } - - public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { - - PhoenixTransactionContext ctx = null; - - switch(tp) { - case Tephra: - ctx = new TephraTransactionContext(txnBytes); - break; - case Omid: -// ctx = new OmidTransactionContext(txnBytes); - break; - default: - ctx = null; - } - - return ctx; + static public TransactionProvider getTransactionProvider() { + return TephraTransactionProvider.getInstance(); } - public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { - - PhoenixTransactionContext ctx = null; - - switch(tp) { - case Tephra: - ctx = new TephraTransactionContext(connection); - break; - case Omid: -// ctx = new OmidTransactionContext(connection); - break; - default: - ctx = null; - } - - return ctx; - } - - public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { - - PhoenixTransactionContext ctx = null; - - switch(tp) { - case Tephra: - ctx = new TephraTransactionContext(contex, connection, subTask); - break; - case Omid: -// ctx = new OmidTransactionContext(contex, connection, subTask); - break; - default: - ctx = null; - } - - return ctx; - } - - public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { - - PhoenixTransactionalTable table = null; - - switch(tp) { + static public TransactionProvider getTransactionProvider(TransactionProcessor processor) { + switch (processor) { case Tephra: - table = new TephraTransactionTable(ctx, htable); - break; + return TephraTransactionProvider.getInstance(); case Omid: -// table = new OmidTransactionContext(contex, connection, subTask); - break; + return OmidTransactionProvider.getInstance(); default: - table = null; + throw new IllegalArgumentException("Unknown transaction processor: " + processor); } - - return table; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java new file mode 100644 index 0000000..a5704f1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; + +public interface TransactionProvider { + public PhoenixTransactionContext getTransactionContext(); + public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException; + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection); + public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask); + + public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable); + + public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp); + public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index bc381f8..1c25c33 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -1515,7 +1515,7 @@ public class PhoenixRuntime { * @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp. */ public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) { - return TransactionFactory.getTransactionFactory().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); + return TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); } public static long getCurrentScn(ReadOnlyProps props) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index a99c700..ab76ffe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -46,11 +46,11 @@ public class TransactionUtil { } public static long convertToNanoseconds(long serverTimeStamp) { - return serverTimeStamp * TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); + return serverTimeStamp * TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond(); } public static long convertToMilliseconds(long serverTimeStamp) { - return serverTimeStamp / TransactionFactory.getTransactionFactory().getTransactionContext().getMaxTransactionsPerSecond(); + return serverTimeStamp / TransactionFactory.getTransactionProvider().getTransactionContext().getMaxTransactionsPerSecond(); } public static PhoenixTransactionalTable getPhoenixTransactionTable(PhoenixTransactionContext phoenixTransactionContext, HTableInterface htable, PTable pTable) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0f68b9/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 326efa3..580becb 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -415,15 +415,15 @@ public abstract class BaseTest { } private static void tearDownTxManager() throws SQLException { - TransactionFactory.getTransactionFactory().getTransactionContext().tearDownTxManager(); + TransactionFactory.getTransactionProvider().getTransactionContext().tearDownTxManager(); } protected static void setTxnConfigs() throws IOException { - TransactionFactory.getTransactionFactory().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS); + TransactionFactory.getTransactionProvider().getTransactionContext().setTxnConfigs(config, tmpFolder.newFolder().getAbsolutePath(), DEFAULT_TXN_TIMEOUT_SECONDS); } protected static void setupTxManager() throws SQLException, IOException { - TransactionFactory.getTransactionFactory().getTransactionContext().setupTxManager(config, getUrl()); + TransactionFactory.getTransactionProvider().getTransactionContext().setupTxManager(config, getUrl()); } private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {