Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 968ff97d2 -> e7ce8e815
[PHOENIX-3623] Integrate Omid with Phoenix. This commit finishes the integration of Omid as Phoenix transaction processor engine. More information regarding the integration exists at [PHOENIX-3623] and at [OMID-82], which is the corresponding jira in Omid. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e7ce8e81 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e7ce8e81 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e7ce8e81 Branch: refs/heads/4.x-HBase-1.2 Commit: e7ce8e815adeeb31d5f31c0beca6ffc81218b4f0 Parents: 968ff97 Author: Ohad Shacham <oh...@yahoo-inc.com> Authored: Thu Dec 20 14:15:03 2018 +0200 Committer: Ohad Shacham <oh...@yahoo-inc.com> Committed: Thu Dec 20 14:15:03 2018 +0200 ---------------------------------------------------------------------- bin/omid-env.sh | 43 ++++ bin/omid-server-configuration.yml | 25 +++ bin/omid.sh | 93 ++++++++ phoenix-assembly/pom.xml | 5 + .../components/all-common-dependencies.xml | 28 +++ phoenix-core/pom.xml | 46 ++++ .../phoenix/coprocessor/OmidGCProcessor.java | 7 +- .../coprocessor/OmidTransactionalProcessor.java | 8 +- .../transaction/OmidTransactionContext.java | 217 ++++++++++++++++++- .../transaction/OmidTransactionProvider.java | 106 ++++++++- .../transaction/OmidTransactionTable.java | 64 +++++- .../phoenix/transaction/TransactionFactory.java | 5 +- .../phoenix/query/QueryServicesTestImpl.java | 1 - phoenix-server/pom.xml | 1 + pom.xml | 47 ++++ 15 files changed, 665 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/bin/omid-env.sh ---------------------------------------------------------------------- diff --git a/bin/omid-env.sh b/bin/omid-env.sh new file mode 100644 index 0000000..820cdaa --- /dev/null +++ b/bin/omid-env.sh @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set the flags to pass to the jvm when running omid +# export JVM_FLAGS=-Xmx8096m +# --------------------------------------------------------------------------------------------------------------------- +# Check if HADOOP_CONF_DIR and HBASE_CONF_DIR are set +# --------------------------------------------------------------------------------------------------------------------- +export JVM_FLAGS=-Xmx4096m +if [ -z ${HADOOP_CONF_DIR+x} ]; then + if [ -z ${HADOOP_HOME+x} ]; then + echo "WARNING: HADOOP_HOME or HADOOP_CONF_DIR are unset"; + else + export HADOOP_CONF_DIR=${HADOOP_HOME}/conf + fi +else + echo "HADOOP_CONF_DIR is set to '$HADOOP_CONF_DIR'"; +fi + +if [ -z ${HBASE_CONF_DIR+x} ]; then + if [ -z ${HBASE_HOME+x} ]; then + echo "WARNING: HBASE_HOME or HBASE_CONF_DIR are unset"; + else + export HBASE_CONF_DIR=${HBASE_HOME}/conf + fi +else + echo "HBASE_CONF_DIR is set to '$HBASE_CONF_DIR'"; +fi http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/bin/omid-server-configuration.yml ---------------------------------------------------------------------- diff --git a/bin/omid-server-configuration.yml b/bin/omid-server-configuration.yml new file mode 100644 index 0000000..8d1616e --- /dev/null +++ b/bin/omid-server-configuration.yml @@ -0,0 +1,25 @@ +# ===================================================================================================================== +# +# Omid TSO Server Configuration +# --------------------------------------------------------------------------------------------------------------------- +# +# Tune here the default values for TSO server config parameters found in 'default-omid-server-configuration.yml' file +# +# ===================================================================================================================== + + +timestampStoreModule: !!org.apache.omid.timestamp.storage.HBaseTimestampStorageModule [ ] +commitTableStoreModule: !!org.apache.omid.committable.hbase.HBaseCommitTableStorageModule [ ] + +metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [ +!!org.apache.omid.metrics.CodahaleMetricsConfig { + outputFreqInSecs: 10, + reporters: !!set { + !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV + }, + csvDir: "csvMetrics", +} +] + +timestampType: WORLD_TIME +lowLatency: false http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/bin/omid.sh ---------------------------------------------------------------------- diff --git a/bin/omid.sh b/bin/omid.sh new file mode 100755 index 0000000..5b33ed5 --- /dev/null +++ b/bin/omid.sh @@ -0,0 +1,93 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +SCRIPTDIR=`dirname $0` +cd $SCRIPTDIR; + +# Load Omid environment variables +source omid-env.sh + +# Configure classpath... +CLASSPATH=./:../conf:${HBASE_CONF_DIR}:${HADOOP_CONF_DIR} + +# ...for source release and... +for j in ../target/omid-tso*.jar; do + CLASSPATH=$CLASSPATH:$j +done + +# and for binary release +for j in ../omid-tso*.jar; do + CLASSPATH=$CLASSPATH:$j +done +for j in ../lib/*.jar; do + CLASSPATH=$CLASSPATH:$j +done + +tso() { + exec java $JVM_FLAGS -cp $CLASSPATH org.apache.omid.tso.TSOServer $@ +} + +tsoRelauncher() { + until ./omid.sh tso $@; do + echo "TSO Server crashed with exit code $?. Re-launching..." >&2 + sleep 1 + done +} + +createHBaseCommitTable() { + java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager commit-table $@ +} + +createHBaseTimestampTable() { + java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager timestamp-table $@ +} + +usage() { + echo "Usage: omid.sh <command> <options>" + echo "where <command> is one of:" + echo " tso Starts The Status Oracle server (TSO)" + echo " tso-relauncher Starts The Status Oracle server (TSO) re-launching it if the process exits" + echo " create-hbase-commit-table Creates the hbase commit table." + echo " create-hbase-timestamp-table Creates the hbase timestamp table." +} + +# if no args specified, show usage +if [ $# = 0 ]; then + usage; + exit 1 +fi + +COMMAND=$1 +shift + +if [ "$COMMAND" = "tso" ]; then + createHBaseTimestampTable $@; + createHBaseCommitTable $@; + tso $@; +elif [ "$COMMAND" = "tso-relauncher" ]; then + tsoRelauncher $@; +elif [ "$COMMAND" = "create-hbase-commit-table" ]; then + createHBaseCommitTable $@; +elif [ "$COMMAND" = "create-hbase-timestamp-table" ]; then + createHBaseTimestampTable $@; +else + exec java -cp $CLASSPATH $COMMAND $@ +fi + + http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml index 4520625..17ccfef 100644 --- a/phoenix-assembly/pom.xml +++ b/phoenix-assembly/pom.xml @@ -130,5 +130,10 @@ <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-load-balancer</artifactId> </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-hbase-tools-hbase1.x</artifactId> + <version>${omid.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-assembly/src/build/components/all-common-dependencies.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/src/build/components/all-common-dependencies.xml b/phoenix-assembly/src/build/components/all-common-dependencies.xml index 336bc4d..84388cd 100644 --- a/phoenix-assembly/src/build/components/all-common-dependencies.xml +++ b/phoenix-assembly/src/build/components/all-common-dependencies.xml @@ -45,6 +45,34 @@ <include>io.netty:netty</include> <include>commons-codec:commons-codec</include> <include>org.apache.calcite:calcite-avatica*</include> + + <!-- For omid TSO --> + <include>org.apache.omid:omid-tso-server-hbase1.x</include> + <include>org.apache.omid:omid-hbase-common-hbase1.x</include> + <include>org.apache.omid:omid-hbase-tools-hbase1.x</include> + <include>org.apache.omid:omid-common</include> + <include>org.apache.omid:omid-metrics</include> + <include>org.apache.omid:omid-timestamp-storage-hbase1.x</include> + <include>org.apache.omid:omid-hbase-shims-hbase1.x</include> + <include>org.apache.omid:omid-commit-table</include> + <include>org.apache.omid:omid-codahale-metrics</include> + <include>org.apache.omid:omid-hbase-commit-table-hbase1.x</include> + <include>org.yaml:snakeyaml</include> + <include>com.google.inject:guice</include> + <include>commons-beanutils:commons-beanutils</include> + <include>javax.inject:javax.inject</include> + <include>aopalliance:aopalliance</include> + <include>org.apache.commons:commons-pool2</include> + <include>com.lmax:disruptor</include> + <include>com.codahale.metrics:metrics-core</include> + <include>com.beust:jcommander</include> + <include>commons-collections:commons-collections</include> + <include>io.netty:netty-all</include> + <include>org.apache.htrace:htrace-core</include> + <include>javax.servlet:javax.servlet-api</include> + <include>commons-cli:commons-cli</include> + <include>com.yammer.metrics:metrics-core</include> + <include>com.codahale.metrics:metrics-graphite</include> </includes> </dependencySet> <!-- Separate dependency set to just pull in the jackson stuff since its test http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index dcdf86e..7fbbcf7 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -195,6 +195,52 @@ </build> <dependencies> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-hbase-client-hbase1.x</artifactId> + <version>${omid.version}</version> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-hbase-coprocessor-hbase1.x</artifactId> + <version>${omid.version}</version> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-tso-server-hbase1.x</artifactId> + <version>${omid.version}</version> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-tso-server-hbase1.x</artifactId> + <version>${omid.version}</version> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Transaction dependencies --> <dependency> <groupId>org.apache.tephra</groupId> http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java index d90a31e..b4a1a0f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java @@ -17,14 +17,15 @@ */ package org.apache.phoenix.coprocessor; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.transaction.OmidCompactor; +import org.apache.omid.transaction.OmidSnapshotFilter; public class OmidGCProcessor extends DelegateRegionObserver { public OmidGCProcessor() { - super(new BaseRegionObserver() { - }); + super(new OmidCompactor(true)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java index f977cfe..b84b5ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java @@ -17,14 +17,16 @@ */ package org.apache.phoenix.coprocessor; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.omid.transaction.OmidSnapshotFilter; +import org.apache.phoenix.transaction.OmidTransactionProvider; public class OmidTransactionalProcessor extends DelegateRegionObserver { public OmidTransactionalProcessor() { - super(new BaseRegionObserver() { - }); + // Hack for testing - retrieves the commit table client from the singleton OmidTransactionProvider + // TODO: use real commit table and get port from config + super(new OmidSnapshotFilter(OmidTransactionProvider.getInstance().getCommitTableClient())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 9edc58b..392de78 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -18,89 +18,276 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.hbase.client.Table; +import org.apache.omid.proto.TSOProto; +import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel; +import org.apache.omid.transaction.HBaseCellId; +import org.apache.omid.transaction.HBaseTransaction; +import org.apache.omid.transaction.HBaseTransactionManager; +import org.apache.omid.transaction.RollbackException; +import org.apache.omid.transaction.Transaction; +import org.apache.omid.transaction.Transaction.Status; +import org.apache.omid.transaction.TransactionException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.TransactionFactory.Provider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException; -//import org.apache.omid.tso.TSOMockModule; public class OmidTransactionContext implements PhoenixTransactionContext { + private static final Logger logger = LoggerFactory.getLogger(OmidTransactionContext.class); + + private HBaseTransactionManager tm; + private HBaseTransaction tx; + public OmidTransactionContext() { + this.tx = null; + this.tm = null; } public OmidTransactionContext(PhoenixConnection connection) throws SQLException { + PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider()); + assert (client instanceof OmidTransactionProvider.OmidTransactionClient); + this.tm = ((OmidTransactionProvider.OmidTransactionClient)client).getTransactionClient(); + this.tx = null; } public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferException { + this(); + if (txnBytes != null && txnBytes.length > 0) { + TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(txnBytes); + tx = new HBaseTransaction(transaction.getTimestamp(), transaction.getEpoch(), new HashSet<HBaseCellId>(), + new HashSet<HBaseCellId>(), null, tm.isLowLatency()); + } else { + tx = null; + } } public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) { + assert (ctx instanceof OmidTransactionContext); + OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx; + + this.tm = omidTransactionContext.tm; + + if (subTask) { + if (omidTransactionContext.isTransactionRunning()) { + Transaction transaction = omidTransactionContext.getTransaction(); + this.tx = new HBaseTransaction(transaction.getTransactionId(), transaction.getEpoch(), + new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), this.tm, + transaction.getReadTimestamp(), transaction.getWriteTimestamp(), tm.isLowLatency()); + } else { + this.tx = null; + } + + this.tm = null; + } else { + this.tx = omidTransactionContext.getTransaction(); + } } @Override public void begin() throws SQLException { + if (tm == null) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build() + .buildException(); + } + + + try { + tx = (HBaseTransaction) tm.begin(); + } catch (TransactionException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } } @Override public void commit() throws SQLException { + if (tx == null || tm == null) + return; + + try { + tm.commit(tx); + } catch (TransactionException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } catch (RollbackException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } } @Override public void abort() throws SQLException { + if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) { + return; + } + + try { + tm.rollback(tx); + } catch (TransactionException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } } @Override public void checkpoint(boolean hasUncommittedData) throws SQLException { + if (hasUncommittedData) { + try { + tx.checkpoint(); + } catch (TransactionException e) { + throw new SQLException(e); + } + } + tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); } @Override public void commitDDLFence(PTable dataTable) throws SQLException { + + try { + tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes()); + if (logger.isInfoEnabled()) { + logger.info("Added write fence at ~" + + tx.getReadTimestamp()); + } + } catch (TransactionException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) + .setSchemaName(dataTable.getSchemaName().getString()) + .setTableName(dataTable.getTableName().getString()).build() + .buildException(); + } } @Override public void join(PhoenixTransactionContext ctx) { + + if (ctx == PhoenixTransactionContext.NULL_CONTEXT) { + return; + } + + assert (ctx instanceof OmidTransactionContext); + OmidTransactionContext omidContext = (OmidTransactionContext) ctx; + + HBaseTransaction transaction = omidContext.getTransaction(); + if (transaction == null || tx == null) return; + + Set<HBaseCellId> writeSet = transaction.getWriteSet(); + + for (HBaseCellId cell : writeSet) { + tx.addWriteSetElement(cell); + } } @Override public boolean isTransactionRunning() { - return false; + return (tx != null); } @Override public void reset() { + tx = null; } @Override public long getTransactionId() { - return 0; + return tx.getTransactionId(); } @Override public long getReadPointer() { - return 0; + return tx.getReadTimestamp(); } @Override public long getWritePointer() { - return 0; + return tx.getWriteTimestamp(); } @Override public PhoenixVisibilityLevel getVisibilityLevel() { - return null; + VisibilityLevel visibilityLevel = null; + + assert(tx != null); + visibilityLevel = tx.getVisibilityLevel(); + + PhoenixVisibilityLevel phoenixVisibilityLevel; + switch (visibilityLevel) { + case SNAPSHOT: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT; + break; + case SNAPSHOT_EXCLUDE_CURRENT: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + break; + case SNAPSHOT_ALL: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL; + default: + phoenixVisibilityLevel = null; + } + + return phoenixVisibilityLevel; } @Override public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { + + VisibilityLevel omidVisibilityLevel = null; + + switch (visibilityLevel) { + case SNAPSHOT: + omidVisibilityLevel = VisibilityLevel.SNAPSHOT; + break; + case SNAPSHOT_EXCLUDE_CURRENT: + omidVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + break; + case SNAPSHOT_ALL: + omidVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL; + break; + default: + assert (false); + } + + assert(tx != null); + tx.setVisibilityLevel(omidVisibilityLevel); + } @Override public byte[] encodeTransaction() throws SQLException { - return null; + assert(tx != null); + + TSOProto.Transaction.Builder transactionBuilder = TSOProto.Transaction.newBuilder(); + + transactionBuilder.setTimestamp(tx.getTransactionId()); + transactionBuilder.setEpoch(tx.getEpoch()); + + byte[] encodedTxBytes = transactionBuilder.build().toByteArray(); + // Add code of TransactionProvider at end of byte array + encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1); + encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode(); + return encodedTxBytes; } @Override @@ -110,13 +297,22 @@ public class OmidTransactionContext implements PhoenixTransactionContext { @Override public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) { - return null; + return new OmidTransactionContext(context, subTask); } @Override public void markDMLFence(PTable dataTable) { } + /** + * OmidTransactionContext specific functions + */ + + public HBaseTransaction getTransaction() { + return tx; + } + + @Override public Table getTransactionalTable(Table htable, boolean isConflictFree) throws SQLException { return new OmidTransactionTable(this, htable, isConflictFree); @@ -124,6 +320,9 @@ public class OmidTransactionContext implements PhoenixTransactionContext { @Override public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException { - return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex); + // When we're getting a table for writing, if the table being written to is an index, + // write the shadow cells immediately since the only time we write to an index is + // when we initially populate it synchronously. + return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex, isIndex); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index bace2bc..87d7225 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -24,11 +24,27 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.committable.InMemoryCommitTable; +import org.apache.omid.transaction.HBaseOmidClientConfiguration; +import org.apache.omid.transaction.HBaseTransactionManager; +import org.apache.omid.transaction.TTable; +import org.apache.omid.tso.TSOMockModule; +import org.apache.omid.tso.TSOServer; +import org.apache.omid.tso.TSOServerConfig; +import org.apache.omid.tso.client.OmidClientConfiguration; +import org.apache.omid.tso.client.TSOClient; import org.apache.phoenix.coprocessor.OmidGCProcessor; import org.apache.phoenix.coprocessor.OmidTransactionalProcessor; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.transaction.TransactionFactory.Provider; +import org.apache.phoenix.util.TransactionUtil; + +import com.google.inject.Guice; +import com.google.inject.Injector; public class OmidTransactionProvider implements PhoenixTransactionProvider { private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider(); @@ -38,6 +54,10 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000; public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME"; + private HBaseTransactionManager transactionManager = null; + private volatile CommitTable.Client commitTableClient = null; + private CommitTable.Writer commitTableWriter = null; + public static final OmidTransactionProvider getInstance() { return INSTANCE; } @@ -63,26 +83,106 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { @Override public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException{ - return new OmidTransactionClient(); + if (transactionManager == null) { + try { + HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); + clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); + transactionManager = (HBaseTransactionManager) HBaseTransactionManager.newInstance(clientConf); + } catch (IOException | InterruptedException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + } + + return new OmidTransactionClient(transactionManager); } static class OmidTransactionClient implements PhoenixTransactionClient { + private final HBaseTransactionManager transactionManager; + + public OmidTransactionClient(HBaseTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + public HBaseTransactionManager getTransactionClient() { + return transactionManager; + } + @Override public void close() throws IOException {} } + // For testing only + public CommitTable.Client getCommitTableClient() { + return commitTableClient; + } + @Override public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo, int port) throws SQLException{ - return new OmidTransactionService(); + TSOServerConfig tsoConfig = new TSOServerConfig(); + TSOServer tso; + + tsoConfig.setPort(port); + tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE)); + tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, DEFAULT_OMID_TSO_TIMESTAMP_TYPE)); + + Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig)); + tso = injector.getInstance(TSOServer.class); + tso.startAndWait(); + + OmidClientConfiguration clientConfig = new OmidClientConfiguration(); + clientConfig.setConnectionString("localhost:" + port); + clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); + + InMemoryCommitTable commitTable = (InMemoryCommitTable) injector.getInstance(CommitTable.class); + + try { + // Create the associated Handler + TSOClient client = TSOClient.newInstance(clientConfig); + + HBaseOmidClientConfiguration clientConf = new HBaseOmidClientConfiguration(); + clientConf.setConnectionString("localhost:" + port); + clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW); + clientConf.setHBaseConfiguration(config); + commitTableClient = commitTable.getClient(); + commitTableWriter = commitTable.getWriter(); + transactionManager = HBaseTransactionManager.builder(clientConf) + .commitTableClient(commitTableClient) + .commitTableWriter(commitTableWriter) + .tsoClient(client).build(); + } catch (IOException | InterruptedException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + + return new OmidTransactionService(tso, transactionManager); } static class OmidTransactionService implements PhoenixTransactionService { + private final HBaseTransactionManager transactionManager; + private TSOServer tso; + + public OmidTransactionService(TSOServer tso, HBaseTransactionManager transactionManager) { + this.tso = tso; + this.transactionManager = transactionManager; + } public void start() { + } @Override public void close() throws IOException { + if (transactionManager != null) { + transactionManager.close(); + } + if (tso != null) { + tso.stopAndWait(); + } } } @@ -108,6 +208,6 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { @Override public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { - return put; + return TTable.markPutAsCommitted(put, timestamp, timestamp); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java index cba1d56..964a1f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -19,6 +19,7 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -41,6 +42,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.omid.transaction.TTable; +import org.apache.omid.transaction.Transaction; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; @@ -48,80 +53,115 @@ import com.google.protobuf.Service; import com.google.protobuf.ServiceException; public class OmidTransactionTable implements Table { + // Copied from HBase ProtobufUtil since it's not accessible + final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true); + + private TTable tTable; + private Transaction tx; + private final boolean addShadowCells; public OmidTransactionTable() throws SQLException { + this.tTable = null; + this.tx = null; + this.addShadowCells = false; } public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable) throws SQLException { + this(ctx, hTable, false); } public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree) throws SQLException { + this(ctx, hTable, isConflictFree, false); + } + + public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree, boolean addShadowCells) throws SQLException { + assert(ctx instanceof OmidTransactionContext); + + OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx; + this.addShadowCells = addShadowCells; + try { + tTable = new TTable(hTable, true, isConflictFree); + } catch (IOException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + + this.tx = omidTransactionContext.getTransaction(); } @Override public Result get(Get get) throws IOException { - return null; + return tTable.get(tx, get); } @Override public void put(Put put) throws IOException { + tTable.put(tx, put, addShadowCells); } @Override public void delete(Delete delete) throws IOException { + tTable.delete(tx, delete); } @Override public ResultScanner getScanner(Scan scan) throws IOException { - return null; + scan.setTimeRange(0, Long.MAX_VALUE); + return tTable.getScanner(tx, scan); } @Override public Configuration getConfiguration() { - return null; + return tTable.getConfiguration(); } @Override public HTableDescriptor getTableDescriptor() throws IOException { - return null; + return tTable.getTableDescriptor(); } @Override public boolean exists(Get get) throws IOException { - return false; + return tTable.exists(tx, get); } @Override public Result[] get(List<Get> gets) throws IOException { - return null; + return tTable.get(tx, gets); } @Override public ResultScanner getScanner(byte[] family) throws IOException { - return null; + return tTable.getScanner(tx, family); } @Override public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { - return null; + return tTable.getScanner(tx, family, qualifier); } @Override public void put(List<Put> puts) throws IOException { + tTable.put(tx, puts, addShadowCells); } @Override public void delete(List<Delete> deletes) throws IOException { + tTable.delete(tx, deletes); } @Override public void close() throws IOException { + tTable.close(); } @Override public TableName getName() { - return null; + byte[] name = tTable.getTableName(); + return TableName.valueOf(name); } @Override @@ -132,12 +172,16 @@ public class OmidTransactionTable implements Table { @Override public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + tTable.batch(tx, actions, addShadowCells); + Arrays.fill(results, EMPTY_RESULT_EXISTS_TRUE); } @Override public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { - return null; + Object[] results; + batch(actions, results = new Object[actions.size()]); + return results; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java index 3e9182f..0f10b37 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -22,10 +22,11 @@ import java.io.IOException; import org.apache.phoenix.coprocessor.MetaDataProtocol; + public class TransactionFactory { public enum Provider { TEPHRA((byte)1, TephraTransactionProvider.getInstance(), true), - OMID((byte)2, OmidTransactionProvider.getInstance(), false); + OMID((byte)2, OmidTransactionProvider.getInstance(), true); private final byte code; private final PhoenixTransactionProvider provider; @@ -49,7 +50,7 @@ public class TransactionFactory { } public static Provider getDefault() { - return TEPHRA; + return OMID; } public PhoenixTransactionProvider getTransactionProvider() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 2049390..59e7fd3 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -22,7 +22,6 @@ import static org.apache.phoenix.query.QueryServicesOptions.withDefaults; import org.apache.curator.shaded.com.google.common.io.Files; import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec; -import org.apache.phoenix.transaction.OmidTransactionProvider; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.TestUtil; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/phoenix-server/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-server/pom.xml b/phoenix-server/pom.xml index aaa0018..ac3d3b3 100644 --- a/phoenix-server/pom.xml +++ b/phoenix-server/pom.xml @@ -122,6 +122,7 @@ <include>org.iq80.snappy:snappy</include> <include>org.antlr:antlr*</include> <include>org.apache.tephra:tephra*</include> + <include>org.apache.omid:omid*</include> <include>com.google.code.gson:gson</include> <include>org.jruby.joni:joni</include> <include>org.jruby.jcodings:jcodings</include> http://git-wip-us.apache.org/repos/asf/phoenix/blob/e7ce8e81/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2ff8157..99b24fb 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,7 @@ <avatica.version>1.12.0</avatica.version> <jettyVersion>8.1.7.v20120910</jettyVersion> <tephra.version>0.15.0-incubating</tephra.version> + <omid.version>1.0.0</omid.version> <spark.version>2.3.2</spark.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> @@ -804,6 +805,52 @@ <artifactId>tephra-hbase-compat-1.1</artifactId> <version>${tephra.version}</version> </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-hbase-client-hbase1.x</artifactId> + <version>${omid.version}</version> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-hbase-coprocessor-hbase1.x</artifactId> + <version>${omid.version}</version> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-tso-server-hbase1.x</artifactId> + <version>${omid.version}</version> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.omid</groupId> + <artifactId>omid-tso-server-hbase1.x</artifactId> + <version>${omid.version}</version> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Make sure we have all the antlr dependencies --> <dependency>