This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new b2b49d5 PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider b2b49d5 is described below commit b2b49d5604c9801dd5950e77e8a23e4fbfc3cd88 Author: Istvan Toth <st...@apache.org> AuthorDate: Thu Jan 27 10:54:35 2022 +0100 PHOENIX-6441 Remove TSOMockModule reference from OmidTransactionProvider --- phoenix-core/pom.xml | 10 +- .../end2end/ConnectionQueryServicesTestImpl.java | 3 +- .../phoenix/tx/ParameterizedTransactionIT.java | 10 +- .../phoenix/query/ConnectionQueryServicesImpl.java | 38 ++-- .../NotAvailableTransactionProvider.java | 14 +- .../transaction/OmidTransactionProvider.java | 97 +--------- .../transaction/PhoenixTransactionProvider.java | 6 +- .../transaction/TephraTransactionProvider.java | 68 +------ .../phoenix/transaction/TransactionFactory.java | 1 - .../transaction/TestTransactionServiceManager.java | 207 +++++++++++++++++++++ 10 files changed, 257 insertions(+), 197 deletions(-) diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 2763fcb..bfd08ab 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -385,10 +385,6 @@ </dependency> <dependency> <groupId>org.apache.omid</groupId> - <artifactId>omid-tso-server-hbase2.x</artifactId> - </dependency> - <dependency> - <groupId>org.apache.omid</groupId> <artifactId>omid-hbase-common-hbase2.x</artifactId> </dependency> <dependency> @@ -416,6 +412,12 @@ <dependency> <groupId>org.apache.omid</groupId> <artifactId>omid-tso-server-hbase2.x</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-tso-server-hbase2.x</artifactId> + <scope>test</scope> <type>test-jar</type> </dependency> diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java index da88f3a..7f436ba 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java @@ -32,6 +32,7 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.transaction.PhoenixTransactionClient; import org.apache.phoenix.transaction.PhoenixTransactionService; +import org.apache.phoenix.transaction.TestTransactionServiceManager; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.SQLCloseables; @@ -105,7 +106,7 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl PhoenixTransactionService txService = txServices[provider.ordinal()]; if (txService == null) { int port = TestUtil.getRandomPort(); - txService = txServices[provider.ordinal()] = provider.getTransactionProvider().getTransactionService(config, connectionInfo, port); + txService = txServices[provider.ordinal()] = TestTransactionServiceManager.startTransactionService(provider, config, connectionInfo, port); } return super.initTransactionClient(provider); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java index 93e55f9..29aa9ef 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; @@ -63,7 +62,6 @@ import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -392,8 +390,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor(); - assertFalse(htable.getDescriptor().getCoprocessors().contains(clazz.getName())); + assertFalse(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName())); assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). getColumnFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); @@ -416,8 +413,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { PTable table = pconn.getTable(new PTableKey(null, t1)); Table htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); assertTrue(table.isTransactional()); - Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor(); - assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName())); + assertTrue(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName())); try { ddl = "ALTER TABLE " + t1 + " SET transactional=false"; @@ -461,7 +457,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT { table = pconn.getTable(new PTableKey(null, t1)); htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); assertTrue(table.isTransactional()); - assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName())); + assertTrue(htable.getDescriptor().getCoprocessors().contains(transactionProvider.getCoprocessorClassName())); } @Test diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 9664210..220ec14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -100,7 +100,6 @@ import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -146,7 +145,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; @@ -1143,26 +1141,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (isTransactional) { - Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); - if (!newDesc.hasCoprocessor(coprocessorClass.getName())) { - builder.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null); - } - Class<? extends RegionObserver> coprocessorGCClass = provider.getTransactionProvider().getGCCoprocessor(); - if (coprocessorGCClass != null) { - if (!newDesc.hasCoprocessor(coprocessorGCClass.getName())) { - builder.addCoprocessor(coprocessorGCClass.getName(), null, priority - 10, null); + String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName(); + if (!newDesc.hasCoprocessor(coprocessorClassName)) { + builder.addCoprocessor(coprocessorClassName, null, priority - 10, null); + } + String coprocessorGCClassName = provider.getTransactionProvider().getGCCoprocessorClassName(); + if (coprocessorGCClassName != null) { + if (!newDesc.hasCoprocessor(coprocessorGCClassName)) { + builder.addCoprocessor(coprocessorGCClassName, null, priority - 10, null); } } } else { // Remove all potential transactional coprocessors for (TransactionFactory.Provider aprovider : TransactionFactory.Provider.available()) { - Class<? extends RegionObserver> coprocessorClass = aprovider.getTransactionProvider().getCoprocessor(); - Class<? extends RegionObserver> coprocessorGCClass = aprovider.getTransactionProvider().getGCCoprocessor(); - if (coprocessorClass != null && newDesc.hasCoprocessor(coprocessorClass.getName())) { - builder.removeCoprocessor(coprocessorClass.getName()); + String coprocessorClassName = aprovider.getTransactionProvider().getCoprocessorClassName(); + String coprocessorGCClassName = aprovider.getTransactionProvider().getGCCoprocessorClassName(); + if (coprocessorClassName != null && newDesc.hasCoprocessor(coprocessorClassName)) { + builder.removeCoprocessor(coprocessorClassName); } - if (coprocessorGCClass != null && newDesc.hasCoprocessor(coprocessorGCClass.getName())) { - builder.removeCoprocessor(coprocessorGCClass.getName()); + if (coprocessorGCClassName != null && newDesc.hasCoprocessor(coprocessorGCClassName)) { + builder.removeCoprocessor(coprocessorGCClassName); } } } @@ -1602,8 +1600,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private static boolean hasTxCoprocessor(TableDescriptor descriptor) { for (TransactionFactory.Provider provider : TransactionFactory.Provider.available()) { - Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); - if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) { + String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName(); + if (coprocessorClassName != null && descriptor.hasCoprocessor(coprocessorClassName)) { return true; } } @@ -1611,8 +1609,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } private static boolean equalTxCoprocessor(TransactionFactory.Provider provider, TableDescriptor existingDesc, TableDescriptor newDesc) { - Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); - return (coprocessorClass != null && existingDesc.hasCoprocessor(coprocessorClass.getName()) && newDesc.hasCoprocessor(coprocessorClass.getName())); + String coprocessorClassName = provider.getTransactionProvider().getCoprocessorClassName(); + return (coprocessorClassName != null && existingDesc.hasCoprocessor(coprocessorClassName) && newDesc.hasCoprocessor(coprocessorClassName)); } private void modifyTable(byte[] tableName, TableDescriptor newDesc, boolean shouldPoll) throws IOException, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java index 2daa6dc..a8db0c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/NotAvailableTransactionProvider.java @@ -22,7 +22,6 @@ import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.transaction.TransactionFactory.Provider; @@ -60,21 +59,18 @@ public class NotAvailableTransactionProvider implements PhoenixTransactionProvid } @Override - public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) { - throw new UnsupportedOperationException(message); + public Provider getProvider() { + return TransactionFactory.Provider.TEPHRA; } @Override - public Class<? extends RegionObserver> getCoprocessor() { + public String getCoprocessorClassName() { throw new UnsupportedOperationException(message); } @Override - public Class<? extends RegionObserver> getGCCoprocessor() {return null;} - - @Override - public Provider getProvider() { - return TransactionFactory.Provider.TEPHRA; + public String getGCCoprocessorClassName() { + throw new UnsupportedOperationException(message); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index 9247248..bbddc13 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -23,18 +23,11 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.omid.committable.CommitTable; -import org.apache.omid.committable.InMemoryCommitTable; import org.apache.omid.transaction.HBaseOmidClientConfiguration; import org.apache.omid.transaction.HBaseTransactionManager; import org.apache.omid.transaction.TTable; -import org.apache.omid.tso.TSOMockModule; -import org.apache.omid.tso.TSOServer; -import org.apache.omid.tso.TSOServerConfig; -import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY; import org.apache.omid.tso.client.OmidClientConfiguration; -import org.apache.omid.tso.client.TSOClient; import org.apache.phoenix.coprocessor.OmidGCProcessor; import org.apache.phoenix.coprocessor.OmidTransactionalProcessor; import org.apache.phoenix.exception.SQLExceptionCode; @@ -42,22 +35,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.transaction.TransactionFactory.Provider; -import org.apache.phoenix.util.TransactionUtil; - -import com.google.inject.Guice; -import com.google.inject.Injector; public class OmidTransactionProvider implements PhoenixTransactionProvider { private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider(); - public static final String OMID_TSO_PORT = "phoenix.omid.tso.port"; - public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size"; - public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type"; - public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000; - public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME"; private HBaseTransactionManager transactionManager = null; private volatile CommitTable.Client commitTableClient = null; - private CommitTable.Writer commitTableWriter = null; public static final OmidTransactionProvider getInstance() { return INSTANCE; @@ -120,84 +103,20 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { return commitTableClient; } - @Override - public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException{ - TSOServerConfig tsoConfig = new TSOServerConfig(); - TSOServer tso; - - tsoConfig.setPort(port); - tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE)); - tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE)); - tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString()); - - Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig)); - tso = injector.getInstance(TSOServer.class); - tso.startAsync(); - tso.awaitRunning(); - - OmidClientConfiguration clientConfig = new OmidClientConfiguration(); - clientConfig.setConnectionString("localhost:" + port); - clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); - - InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class); - - try { - // Create the associated Handler - TSOClient client = TSOClient.newInstance(clientConfig); - - HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); - clientConf.setConnectionString("localhost:" + port); - clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); - clientConf.setHBaseConfiguration(config); - commitTableClient = commitTable.getClient(); - commitTableWriter = commitTable.getWriter(); - transactionManager = HBaseTransactionManager.builder(clientConf) - .commitTableClient(commitTableClient) - .commitTableWriter(commitTableWriter) - .tsoClient(client).build(); - } catch (IOException | InterruptedException e) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.TRANSACTION_FAILED) - .setMessage(e.getMessage()).setRootCause(e).build() - .buildException(); - } - - return new OmidTransactionService(tso, transactionManager); - } - - static class OmidTransactionService implements PhoenixTransactionService { - private final HBaseTransactionManager transactionManager; - private TSOServer tso; - - public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) { - this.tso = tso; - this.transactionManager = transactionManager; - } - - public void start() { - - } - - @Override - public void close() throws IOException { - if (transactionManager != null) { - transactionManager.close(); - } - if (tso != null) { - tso.stopAsync(); - tso.awaitTerminated(); - } - } + // For testing only + public void injectTestService(HBaseTransactionManager transactionManager, CommitTable.Client commitTableClient) { + this.transactionManager = transactionManager; + this.commitTableClient = commitTableClient; } @Override - public Class<? extends RegionObserver> getCoprocessor() { - return OmidTransactionalProcessor.class; + public String getCoprocessorClassName() { + return OmidTransactionalProcessor.class.getName(); } @Override - public Class<? extends RegionObserver> getGCCoprocessor() { - return OmidGCProcessor.class; + public String getGCCoprocessorClassName() { + return OmidGCProcessor.class.getName(); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java index f50b6b5..d730c67 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java @@ -22,7 +22,6 @@ import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; @@ -50,9 +49,8 @@ public interface PhoenixTransactionProvider { public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException; public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException; - public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException; - public Class<? extends RegionObserver> getCoprocessor(); - public Class<? extends RegionObserver> getGCCoprocessor(); + public String getCoprocessorClassName(); + public String getGCCoprocessorClassName(); public TransactionFactory.Provider getProvider(); public boolean isUnsupported(Feature feature); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java index 9a216ee..7049858 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; @@ -33,14 +32,9 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; import org.apache.tephra.distributed.PooledClientProvider; -import org.apache.tephra.distributed.TransactionService; import org.apache.tephra.distributed.TransactionServiceClient; import org.apache.tephra.inmemory.InMemoryTxSystemClient; -import org.apache.tephra.metrics.TxMetricsCollector; -import org.apache.tephra.persist.HDFSTransactionStateStorage; -import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.zookeeper.TephraZKClientService; -import org.apache.tephra.shaded.org.apache.twill.discovery.DiscoveryService; import org.apache.tephra.shaded.org.apache.twill.discovery.ZKDiscoveryService; import org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies; import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService; @@ -48,7 +42,6 @@ import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices; import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients; import org.apache.tephra.shaded.com.google.common.collect.ArrayListMultimap; -import com.google.inject.util.Providers; public class TephraTransactionProvider implements PhoenixTransactionProvider { private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider(); @@ -110,57 +103,6 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { return client; } - @Override - public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) { - config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port); - int retryTimeOut = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC, - TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC); - ZKClientService zkClient = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) - .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, - HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) - .build(), - RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS) - ) - ) - ); - - DiscoveryService discovery = new ZKDiscoveryService(zkClient); - TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, - new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); - TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); - TephraTransactionService service = new TephraTransactionService(zkClient, txService); - service.start(); - return service; - } - - static class TephraTransactionService implements PhoenixTransactionService { - private final ZKClientService zkClient; - private final TransactionService txService; - - public TephraTransactionService(ZKClientService zkClient, TransactionService txService) { - this.zkClient = zkClient; - this.txService = txService; - } - - public void start() { - zkClient.startAndWait(); - txService.startAndWait(); - } - - @Override - public void close() throws IOException { - try { - if (txService != null) txService.stopAndWait(); - } finally { - if (zkClient != null) zkClient.stopAndWait(); - } - } - - } - static class TephraTransactionClient implements PhoenixTransactionClient { private final ZKClientService zkClient; private final TransactionSystemClient txClient; @@ -188,14 +130,16 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { } } - + @Override - public Class<? extends RegionObserver> getCoprocessor() { - return TephraTransactionalProcessor.class; + public String getCoprocessorClassName() { + return TephraTransactionalProcessor.class.getName(); } @Override - public Class<? extends RegionObserver> getGCCoprocessor() {return null;} + public String getGCCoprocessorClassName() { + return null; + } @Override public Provider getProvider() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java index ab0d4c9..7f6afcb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -18,7 +18,6 @@ package org.apache.phoenix.transaction; import java.io.IOException; - import org.apache.phoenix.coprocessor.MetaDataProtocol; public class TransactionFactory { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java new file mode 100644 index 0000000..a4a15d8 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/transaction/TestTransactionServiceManager.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.committable.CommitTable.Client; +import org.apache.omid.committable.CommitTable.Writer; +import org.apache.omid.committable.InMemoryCommitTable; +import org.apache.omid.transaction.HBaseOmidClientConfiguration; +import org.apache.omid.transaction.HBaseTransactionManager; +import org.apache.omid.tso.TSOMockModule; +import org.apache.omid.tso.TSOServer; +import org.apache.omid.tso.TSOServerConfig; +import org.apache.omid.tso.TSOServerConfig.WAIT_STRATEGY; +import org.apache.omid.tso.client.OmidClientConfiguration; +import org.apache.omid.tso.client.TSOClient; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.transaction.TransactionFactory.Provider; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.shaded.org.apache.twill.discovery.DiscoveryService; +import org.apache.tephra.shaded.org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.tephra.shaded.org.apache.twill.zookeeper.RetryStrategies; +import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientService; +import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClientServices; +import org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClients; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.util.Providers; + +public class TestTransactionServiceManager { + + public static final String OMID_TSO_PORT = "phoenix.omid.tso.port"; + public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size"; + public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type"; + public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000; + public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME"; + + public static OmidTransactionService startAndInjectOmidTransactionService( + OmidTransactionProvider transactionProvider, Configuration config, + ConnectionInfo connectionInfo, int port) throws SQLException { + TSOServerConfig tsoConfig = new TSOServerConfig(); + TSOServer tso; + + tsoConfig.setPort(port); + tsoConfig.setConflictMapSize( + config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE)); + tsoConfig.setTimestampType( + config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE)); + tsoConfig.setWaitStrategy(WAIT_STRATEGY.LOW_CPU.toString()); + + Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig)); + tso = injector.getInstance(TSOServer.class); + tso.startAsync(); + tso.awaitRunning(); + + OmidClientConfiguration clientConfig = new OmidClientConfiguration(); + clientConfig.setConnectionString("localhost:" + port); + clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); + + InMemoryCommitTable commitTable = + (InMemoryCommitTable) injector.getInstance(CommitTable.class); + + HBaseTransactionManager transactionManager; + Client commitTableClient; + Writer commitTableWriter; + try { + // Create the associated Handler + TSOClient client = TSOClient.newInstance(clientConfig); + + HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); + clientConf.setConnectionString("localhost:" + port); + clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); + clientConf.setHBaseConfiguration(config); + commitTableClient = commitTable.getClient(); + commitTableWriter = commitTable.getWriter(); + transactionManager = + HBaseTransactionManager.builder(clientConf).commitTableClient(commitTableClient) + .commitTableWriter(commitTableWriter).tsoClient(client).build(); + } catch (IOException | InterruptedException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build().buildException(); + } + + transactionProvider.injectTestService(transactionManager, commitTableClient); + + return new OmidTransactionService(tso, transactionManager); + } + + public static class OmidTransactionService implements PhoenixTransactionService { + private final HBaseTransactionManager transactionManager; + private TSOServer tso; + + public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) { + this.tso = tso; + this.transactionManager = transactionManager; + } + + public void start() { + + } + + @Override + public void close() throws IOException { + if (transactionManager != null) { + transactionManager.close(); + } + if (tso != null) { + tso.stopAsync(); + tso.awaitTerminated(); + } + } + } + + public static TephraTransactionService startAndInjectTephraTransactionService( + TephraTransactionProvider transactionProvider, + Configuration config, ConnectionInfo connInfo, int port) { + config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port); + int retryTimeOut = + config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC, + TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC); + ZKClientService zkClient = + ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure( + ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS)))); + + DiscoveryService discovery = new ZKDiscoveryService(zkClient); + TransactionManager txManager = + new TransactionManager( + config, new HDFSTransactionStateStorage(config, + new SnapshotCodecProvider(config), new TxMetricsCollector()), + new TxMetricsCollector()); + TransactionService txService = + new TransactionService(config, zkClient, discovery, Providers.of(txManager)); + TephraTransactionService service = new TephraTransactionService(zkClient, txService); + service.start(); + + return service; + } + + public static class TephraTransactionService implements PhoenixTransactionService { + private final ZKClientService zkClient; + private final TransactionService txService; + + public TephraTransactionService(ZKClientService zkClient, TransactionService txService) { + this.zkClient = zkClient; + this.txService = txService; + } + + public void start() { + zkClient.startAndWait(); + txService.startAndWait(); + } + + @Override + public void close() throws IOException { + try { + if (txService != null) txService.stopAndWait(); + } finally { + if (zkClient != null) zkClient.stopAndWait(); + } + } + + } + + public static PhoenixTransactionService startTransactionService(TransactionFactory.Provider provider, Configuration config, ConnectionInfo connInfo, int port) throws SQLException { + PhoenixTransactionProvider transactionProvider = provider.getTransactionProvider(); + if(provider == Provider.TEPHRA) { + return startAndInjectTephraTransactionService((TephraTransactionProvider)transactionProvider, config, connInfo, port); + } else if (provider == Provider.OMID) { + return startAndInjectOmidTransactionService((OmidTransactionProvider)transactionProvider, config, connInfo, port); + } + throw new UnsupportedOperationException("Unknown transaction provider"); + } + +} \ No newline at end of file