Repository: phoenix
Updated Branches:
  refs/heads/master 85ab5d616 -> 412e07891


[PHOENIX-3623] Integrate Omid with Phoenix.

This commit finishes the integration of Omid as Phoenix transaction processor 
engine.
More information regarding the integration exists at [PHOENIX-3623] and at 
[OMID-82], which is the corresponding jira in Omid.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/412e0789
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/412e0789
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/412e0789

Branch: refs/heads/master
Commit: 412e078915c6a0a4c0b8f86a1ae7f22520f41824
Parents: 85ab5d6
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Thu Dec 20 13:53:09 2018 +0200
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Thu Dec 20 13:53:09 2018 +0200

----------------------------------------------------------------------
 bin/omid-env.sh                                 |  43 ++++
 bin/omid-server-configuration.yml               |  25 +++
 bin/omid.sh                                     |  93 ++++++++
 phoenix-assembly/pom.xml                        |   5 +
 .../components/all-common-dependencies.xml      |  28 +++
 phoenix-core/pom.xml                            |  46 ++++
 .../phoenix/coprocessor/OmidGCProcessor.java    |   6 +-
 .../coprocessor/OmidTransactionalProcessor.java |   7 +-
 .../transaction/OmidTransactionContext.java     | 217 ++++++++++++++++++-
 .../transaction/OmidTransactionProvider.java    | 106 ++++++++-
 .../transaction/OmidTransactionTable.java       |  60 ++++-
 .../phoenix/transaction/TransactionFactory.java |   5 +-
 .../phoenix/query/QueryServicesTestImpl.java    |   1 -
 pom.xml                                         |   1 +
 14 files changed, 615 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/bin/omid-env.sh
----------------------------------------------------------------------
diff --git a/bin/omid-env.sh b/bin/omid-env.sh
new file mode 100644
index 0000000..820cdaa
--- /dev/null
+++ b/bin/omid-env.sh
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set the flags to pass to the jvm when running omid
+# export JVM_FLAGS=-Xmx8096m
+# 
---------------------------------------------------------------------------------------------------------------------
+# Check if HADOOP_CONF_DIR and HBASE_CONF_DIR are set
+# 
---------------------------------------------------------------------------------------------------------------------
+export JVM_FLAGS=-Xmx4096m
+if [ -z ${HADOOP_CONF_DIR+x} ]; then
+    if [ -z ${HADOOP_HOME+x} ]; then
+        echo "WARNING: HADOOP_HOME or HADOOP_CONF_DIR are unset";
+    else
+        export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
+    fi
+else
+    echo "HADOOP_CONF_DIR is set to '$HADOOP_CONF_DIR'";
+fi
+
+if [ -z ${HBASE_CONF_DIR+x} ]; then
+    if [ -z ${HBASE_HOME+x} ]; then
+        echo "WARNING: HBASE_HOME or HBASE_CONF_DIR are unset";
+    else
+        export HBASE_CONF_DIR=${HBASE_HOME}/conf
+    fi
+else
+    echo "HBASE_CONF_DIR is set to '$HBASE_CONF_DIR'";
+fi

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/bin/omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/bin/omid-server-configuration.yml 
b/bin/omid-server-configuration.yml
new file mode 100644
index 0000000..8d1616e
--- /dev/null
+++ b/bin/omid-server-configuration.yml
@@ -0,0 +1,25 @@
+# 
=====================================================================================================================
+#
+# Omid TSO Server Configuration
+# 
---------------------------------------------------------------------------------------------------------------------
+#
+# Tune here the default values for TSO server config parameters found in 
'default-omid-server-configuration.yml' file
+#
+# 
=====================================================================================================================
+
+
+timestampStoreModule: 
!!org.apache.omid.timestamp.storage.HBaseTimestampStorageModule [ ]
+commitTableStoreModule: 
!!org.apache.omid.committable.hbase.HBaseCommitTableStorageModule [ ]
+
+metrics: !!org.apache.omid.metrics.CodahaleMetricsProvider [
+!!org.apache.omid.metrics.CodahaleMetricsConfig {
+  outputFreqInSecs: 10,
+  reporters: !!set {
+    !!org.apache.omid.metrics.CodahaleMetricsConfig$Reporter CSV
+  },
+  csvDir: "csvMetrics",
+}
+]
+
+timestampType: WORLD_TIME
+lowLatency: false

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/bin/omid.sh
----------------------------------------------------------------------
diff --git a/bin/omid.sh b/bin/omid.sh
new file mode 100644
index 0000000..5b33ed5
--- /dev/null
+++ b/bin/omid.sh
@@ -0,0 +1,93 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SCRIPTDIR=`dirname $0`
+cd $SCRIPTDIR;
+
+# Load Omid environment variables
+source omid-env.sh
+
+# Configure classpath...
+CLASSPATH=./:../conf:${HBASE_CONF_DIR}:${HADOOP_CONF_DIR}
+
+# ...for source release and...
+for j in ../target/omid-tso*.jar; do
+    CLASSPATH=$CLASSPATH:$j
+done
+
+# and for binary release
+for j in ../omid-tso*.jar; do
+    CLASSPATH=$CLASSPATH:$j
+done
+for j in ../lib/*.jar; do
+    CLASSPATH=$CLASSPATH:$j
+done
+
+tso() {
+    exec java $JVM_FLAGS -cp $CLASSPATH org.apache.omid.tso.TSOServer $@
+}
+
+tsoRelauncher() {
+    until ./omid.sh tso $@; do
+        echo "TSO Server crashed with exit code $?.  Re-launching..." >&2
+        sleep 1
+    done
+}
+
+createHBaseCommitTable() {
+    java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager 
commit-table $@
+}
+
+createHBaseTimestampTable() {
+    java -cp $CLASSPATH org.apache.omid.tools.hbase.OmidTableManager 
timestamp-table $@
+}
+
+usage() {
+    echo "Usage: omid.sh <command> <options>"
+    echo "where <command> is one of:"
+    echo "  tso                           Starts The Status Oracle server 
(TSO)"
+    echo "  tso-relauncher                Starts The Status Oracle server 
(TSO) re-launching it if the process exits"
+    echo "  create-hbase-commit-table     Creates the hbase commit table."
+    echo "  create-hbase-timestamp-table  Creates the hbase timestamp table."
+}
+
+# if no args specified, show usage
+if [ $# = 0 ]; then
+    usage;
+    exit 1
+fi
+
+COMMAND=$1
+shift
+
+if [ "$COMMAND" = "tso" ]; then
+    createHBaseTimestampTable $@;
+    createHBaseCommitTable $@;
+    tso $@;
+elif [ "$COMMAND" = "tso-relauncher" ]; then
+    tsoRelauncher $@;
+elif [ "$COMMAND" = "create-hbase-commit-table" ]; then
+    createHBaseCommitTable $@;
+elif [ "$COMMAND" = "create-hbase-timestamp-table" ]; then
+    createHBaseTimestampTable $@;
+else
+    exec java -cp $CLASSPATH $COMMAND $@
+fi
+
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml
index d143d93..8efcafd 100644
--- a/phoenix-assembly/pom.xml
+++ b/phoenix-assembly/pom.xml
@@ -130,5 +130,10 @@
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-load-balancer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.omid</groupId>
+      <artifactId>omid-hbase-tools-hbase2.x</artifactId>
+      <version>${omid.version}</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-assembly/src/build/components/all-common-dependencies.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-dependencies.xml 
b/phoenix-assembly/src/build/components/all-common-dependencies.xml
index 336bc4d..47ec86b 100644
--- a/phoenix-assembly/src/build/components/all-common-dependencies.xml
+++ b/phoenix-assembly/src/build/components/all-common-dependencies.xml
@@ -45,6 +45,34 @@
         <include>io.netty:netty</include>
         <include>commons-codec:commons-codec</include>
         <include>org.apache.calcite:calcite-avatica*</include>
+
+        <!-- For omid TSO -->
+        <include>org.apache.omid:omid-tso-server-hbase2.x</include>
+        <include>org.apache.omid:omid-hbase-common-hbase2.x</include>
+        <include>org.apache.omid:omid-hbase-tools-hbase2.x</include>
+        <include>org.apache.omid:omid-common</include>
+        <include>org.apache.omid:omid-metrics</include>
+        <include>org.apache.omid:omid-timestamp-storage-hbase2.x</include>
+        <include>org.apache.omid:omid-hbase-shims-hbase2.x</include>
+        <include>org.apache.omid:omid-commit-table</include>
+        <include>org.apache.omid:omid-codahale-metrics</include>
+        <include>org.apache.omid:omid-hbase-commit-table-hbase2.x</include>
+        <include>org.yaml:snakeyaml</include>
+        <include>com.google.inject:guice</include>
+        <include>commons-beanutils:commons-beanutils</include>
+        <include>javax.inject:javax.inject</include>
+        <include>aopalliance:aopalliance</include>
+        <include>org.apache.commons:commons-pool2</include>
+        <include>com.lmax:disruptor</include>
+        <include>com.codahale.metrics:metrics-core</include>
+        <include>com.beust:jcommander</include>
+        <include>commons-collections:commons-collections</include>
+        <include>io.netty:netty-all</include>
+        <include>org.apache.htrace:htrace-core</include>
+        <include>javax.servlet:javax.servlet-api</include>
+        <include>commons-cli:commons-cli</include>
+        <include>com.yammer.metrics:metrics-core</include>
+        <include>com.codahale.metrics:metrics-graphite</include>
       </includes>
     </dependencySet>
     <!-- Separate dependency set to just pull in the jackson stuff since its 
test 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index f220bfb..8056ff2 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -195,6 +195,52 @@
   </build>
 
   <dependencies>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-hbase-client-hbase2.x</artifactId>
+    <version>${omid.version}</version>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-hbase-coprocessor-hbase2.x</artifactId>
+    <version>${omid.version}</version>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-tso-server-hbase2.x</artifactId>
+    <version>${omid.version}</version>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.omid</groupId>
+    <artifactId>omid-tso-server-hbase2.x</artifactId>
+    <version>${omid.version}</version>
+    <type>test-jar</type>
+    <exclusions>
+      <exclusion>
+        <groupId>org.testng</groupId>
+        <artifactId>testng</artifactId>
+      </exclusion>
+    </exclusions>
+  </dependency>
+
     <!-- Transaction dependencies -->
     <dependency>
       <groupId>org.apache.tephra</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
index 70658fb..b4a1a0f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
@@ -17,13 +17,15 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.transaction.OmidCompactor;
+import org.apache.omid.transaction.OmidSnapshotFilter;
 
 
 public class OmidGCProcessor extends DelegateRegionObserver {
 
     public OmidGCProcessor() {
-        super(new BaseRegionObserver());
+        super(new OmidCompactor(true));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
index fc246d4..b84b5ae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
@@ -17,13 +17,16 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.omid.transaction.OmidSnapshotFilter;
+import org.apache.phoenix.transaction.OmidTransactionProvider;
 
 
 public class OmidTransactionalProcessor extends DelegateRegionObserver {
 
     public OmidTransactionalProcessor() {
-        super(new BaseRegionObserver());
+        // Hack for testing - retrieves the commit table client from the 
singleton OmidTransactionProvider
+        // TODO: use real commit table and get port from config
+        super(new 
OmidSnapshotFilter(OmidTransactionProvider.getInstance().getCommitTableClient()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 9edc58b..b594cd3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -18,89 +18,276 @@
 package org.apache.phoenix.transaction;
 
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.omid.proto.TSOProto;
+import org.apache.omid.transaction.AbstractTransaction.VisibilityLevel;
+import org.apache.omid.transaction.HBaseCellId;
+import org.apache.omid.transaction.HBaseTransaction;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.RollbackException;
+import org.apache.omid.transaction.Transaction;
+import org.apache.omid.transaction.Transaction.Status;
+import org.apache.omid.transaction.TransactionException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-//import org.apache.omid.tso.TSOMockModule;
 
 public class OmidTransactionContext implements PhoenixTransactionContext {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(OmidTransactionContext.class);
+
+    private HBaseTransactionManager tm;
+    private HBaseTransaction tx;
+
     public OmidTransactionContext() {
+        this.tx = null;
+        this.tm = null;
     }
 
     public OmidTransactionContext(PhoenixConnection connection) throws 
SQLException {
+        PhoenixTransactionClient client = 
connection.getQueryServices().initTransactionClient(getProvider());
+        assert (client instanceof 
OmidTransactionProvider.OmidTransactionClient);
+        this.tm = 
((OmidTransactionProvider.OmidTransactionClient)client).getTransactionClient();
+        this.tx = null;
     }
 
     public OmidTransactionContext(byte[] txnBytes) throws 
InvalidProtocolBufferException {
+        this();
+        if (txnBytes != null && txnBytes.length > 0) {
+            TSOProto.Transaction transaction = 
TSOProto.Transaction.parseFrom(txnBytes);
+            tx = new HBaseTransaction(transaction.getTimestamp(), 
transaction.getEpoch(), new HashSet<HBaseCellId>(),
+                    new HashSet<HBaseCellId>(), null, tm.isLowLatency());
+        } else {
+            tx = null;
+        }
     }
 
     public OmidTransactionContext(PhoenixTransactionContext ctx, boolean 
subTask) {
+        assert (ctx instanceof OmidTransactionContext);
+        OmidTransactionContext omidTransactionContext = 
(OmidTransactionContext) ctx;
+
+        this.tm = omidTransactionContext.tm;
+
+        if (subTask) {
+            if (omidTransactionContext.isTransactionRunning()) {
+                Transaction transaction = 
omidTransactionContext.getTransaction();
+                this.tx = new HBaseTransaction(transaction.getTransactionId(), 
transaction.getEpoch(),
+                        new HashSet<HBaseCellId>(), new 
HashSet<HBaseCellId>(), this.tm,
+                        transaction.getReadTimestamp(), 
transaction.getWriteTimestamp(), tm.isLowLatency());
+            } else {
+                this.tx = null;
+            }
+
+            this.tm = null;
+        } else {
+            this.tx = omidTransactionContext.getTransaction();
+        }
     }
 
     @Override
     public void begin() throws SQLException {
+        if (tm == null) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
+                    .buildException();
+        }
+
+
+        try {
+            tx = (HBaseTransaction) tm.begin();
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
     }
 
     @Override
     public void commit() throws SQLException {
+        if (tx == null || tm == null)
+            return;
+
+        try {
+            tm.commit(tx);
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        } catch (RollbackException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
     }
 
     @Override
     public void abort() throws SQLException {
+        if (tx == null || tm == null || tx.getStatus() != Status.RUNNING) {
+            return;
+        }
+
+        try {
+            tm.rollback(tx);
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
     }
 
     @Override
     public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        if (hasUncommittedData) {
+            try {
+                tx.checkpoint();
+            } catch (TransactionException e) {
+                throw new SQLException(e);
+            }
+        }
+        tx.setVisibilityLevel(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
     }
 
     @Override
     public void commitDDLFence(PTable dataTable) throws SQLException {
+
+        try {
+            tx = (HBaseTransaction) tm.fence(dataTable.getName().getBytes());
+            if (logger.isInfoEnabled()) {
+                logger.info("Added write fence at ~"
+                        + tx.getReadTimestamp());
+            }
+        } catch (TransactionException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+            .setSchemaName(dataTable.getSchemaName().getString())
+            .setTableName(dataTable.getTableName().getString()).build()
+            .buildException();
+        }
     }
 
     @Override
     public void join(PhoenixTransactionContext ctx) {
+
+        if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
+            return;
+        }
+
+        assert (ctx instanceof OmidTransactionContext);
+        OmidTransactionContext omidContext = (OmidTransactionContext) ctx;
+
+        HBaseTransaction transaction = omidContext.getTransaction();
+        if (transaction == null || tx == null) return;
+
+        Set<HBaseCellId> writeSet = transaction.getWriteSet();
+
+        for (HBaseCellId cell : writeSet) {
+            tx.addWriteSetElement(cell);
+        }
     }
 
     @Override
     public boolean isTransactionRunning() {
-        return false;
+        return (tx != null);
     }
 
     @Override
     public void reset() {
+        tx = null;
     }
 
     @Override
     public long getTransactionId() {
-        return 0;
+        return tx.getTransactionId();
     }
 
     @Override
     public long getReadPointer() {
-        return 0;
+        return tx.getReadTimestamp();
     }
 
     @Override
     public long getWritePointer() {
-        return 0;
+        return tx.getWriteTimestamp();
     }
 
     @Override
     public PhoenixVisibilityLevel getVisibilityLevel() {
-        return null;
+        VisibilityLevel visibilityLevel = null;
+
+        assert(tx != null);
+        visibilityLevel = tx.getVisibilityLevel();
+
+        PhoenixVisibilityLevel phoenixVisibilityLevel;
+        switch (visibilityLevel) {
+        case SNAPSHOT:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            phoenixVisibilityLevel = 
PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
+        default:
+            phoenixVisibilityLevel = null;
+        }
+
+        return phoenixVisibilityLevel;
     }
 
     @Override
     public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+
+        VisibilityLevel omidVisibilityLevel = null;
+
+        switch (visibilityLevel) {
+        case SNAPSHOT:
+            omidVisibilityLevel = VisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            omidVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            omidVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
+            break;
+        default:
+            assert (false);
+        }
+
+        assert(tx != null);
+        tx.setVisibilityLevel(omidVisibilityLevel);
+
     }
 
     @Override
     public byte[] encodeTransaction() throws SQLException {
-        return null;
+        assert(tx != null);
+
+        TSOProto.Transaction.Builder transactionBuilder = 
TSOProto.Transaction.newBuilder();
+
+        transactionBuilder.setTimestamp(tx.getTransactionId());
+        transactionBuilder.setEpoch(tx.getEpoch());
+
+        byte[] encodedTxBytes = transactionBuilder.build().toByteArray();
+        // Add code of TransactionProvider at end of byte array
+        encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 
1);
+        encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
+        return encodedTxBytes;
     }
 
     @Override
@@ -110,13 +297,22 @@ public class OmidTransactionContext implements 
PhoenixTransactionContext {
 
     @Override
     public PhoenixTransactionContext 
newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
-        return null;
+        return new OmidTransactionContext(context, subTask);
     }
 
     @Override
     public void markDMLFence(PTable dataTable) {
     }
 
+    /**
+    *  OmidTransactionContext specific functions
+    */
+
+    public HBaseTransaction getTransaction() {
+        return tx;
+    }
+
+
     @Override
     public Table getTransactionalTable(Table htable, boolean isConflictFree) 
throws SQLException {
         return new OmidTransactionTable(this, htable, isConflictFree);
@@ -124,6 +320,9 @@ public class OmidTransactionContext implements 
PhoenixTransactionContext {
 
     @Override
     public Table getTransactionalTableWriter(PhoenixConnection connection, 
PTable table, Table htable, boolean isIndex) throws SQLException {
-        return new OmidTransactionTable(this, htable, table.isImmutableRows() 
|| isIndex);
+        // When we're getting a table for writing, if the table being written 
to is an index,
+        // write the shadow cells immediately since the only time we write to 
an index is
+        // when we initially populate it synchronously.
+        return new OmidTransactionTable(this, htable, table.isImmutableRows() 
|| isIndex, isIndex);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index bace2bc..be6145d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -24,11 +24,27 @@ import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.transaction.HBaseOmidClientConfiguration;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.client.OmidClientConfiguration;
+import org.apache.omid.tso.client.TSOClient;
 import org.apache.phoenix.coprocessor.OmidGCProcessor;
 import org.apache.phoenix.coprocessor.OmidTransactionalProcessor;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.TransactionUtil;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 
 public class OmidTransactionProvider implements PhoenixTransactionProvider {
     private static final OmidTransactionProvider INSTANCE = new 
OmidTransactionProvider();
@@ -38,6 +54,10 @@ public class OmidTransactionProvider implements 
PhoenixTransactionProvider {
     public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000;
     public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME";
 
+    private HBaseTransactionManager transactionManager = null;
+    private volatile CommitTable.Client commitTableClient = null;
+    private CommitTable.Writer commitTableWriter = null;
+
     public static final OmidTransactionProvider getInstance() {
         return INSTANCE;
     }
@@ -63,26 +83,106 @@ public class OmidTransactionProvider implements 
PhoenixTransactionProvider {
 
     @Override
     public PhoenixTransactionClient getTransactionClient(Configuration config, 
ConnectionInfo connectionInfo) throws SQLException{
-        return new OmidTransactionClient();
+        if (transactionManager == null) {
+            try {
+                HBaseOmidClientConfiguration clientConf = new 
HBaseOmidClientConfiguration();
+                
clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+                transactionManager = (HBaseTransactionManager) 
HBaseTransactionManager.newInstance(clientConf);
+            } catch (IOException | InterruptedException e) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.TRANSACTION_FAILED)
+                        .setMessage(e.getMessage()).setRootCause(e).build()
+                        .buildException();
+            }
+        }
+
+        return new OmidTransactionClient(transactionManager);
     }
 
     static class OmidTransactionClient implements PhoenixTransactionClient {
+        private final HBaseTransactionManager transactionManager;
+
+        public OmidTransactionClient(HBaseTransactionManager 
transactionManager) {
+            this.transactionManager = transactionManager;
+        }
+
+        public HBaseTransactionManager getTransactionClient() {
+            return transactionManager;
+        }
+
         @Override
         public void close() throws IOException {}
     }
 
+    // For testing only
+    public CommitTable.Client getCommitTableClient() {
+        return commitTableClient;
+    }
+
     @Override
     public PhoenixTransactionService getTransactionService(Configuration 
config, ConnectionInfo connectionInfo, int port) throws  SQLException{
-        return new OmidTransactionService();
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        TSOServer tso;
+
+        tsoConfig.setPort(port);
+        tsoConfig.setConflictMapSize(config.getInt(OMID_TSO_CONFLICT_MAP_SIZE, 
DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE));
+        tsoConfig.setTimestampType(config.get(OMID_TSO_TIMESTAMP_TYPE, 
DEFAULT_OMID_TSO_TIMESTAMP_TYPE));
+
+        Injector injector = Guice.createInjector(new TSOMockModule(tsoConfig));
+        tso = injector.getInstance(TSOServer.class);
+        tso.startAndWait();
+
+        OmidClientConfiguration clientConfig = new OmidClientConfiguration();
+        clientConfig.setConnectionString("localhost:" + port);
+        
clientConfig.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+
+        InMemoryCommitTable commitTable = (InMemoryCommitTable) 
injector.getInstance(CommitTable.class);
+
+        try {
+            // Create the associated Handler
+            TSOClient client = TSOClient.newInstance(clientConfig);
+
+            HBaseOmidClientConfiguration clientConf = new 
HBaseOmidClientConfiguration();
+            clientConf.setConnectionString("localhost:" + port);
+            
clientConf.setConflictAnalysisLevel(OmidClientConfiguration.ConflictDetectionLevel.ROW);
+            clientConf.setHBaseConfiguration(config);
+            commitTableClient = commitTable.getClient();
+            commitTableWriter = commitTable.getWriter();
+            transactionManager = HBaseTransactionManager.builder(clientConf)
+                    .commitTableClient(commitTableClient)
+                    .commitTableWriter(commitTableWriter)
+                    .tsoClient(client).build();
+        } catch (IOException | InterruptedException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
+
+        return new OmidTransactionService(tso, transactionManager);
     }
 
     static class OmidTransactionService implements PhoenixTransactionService {
+        private final HBaseTransactionManager transactionManager;
+        private TSOServer tso;
+
+        public OmidTransactionService(TSOServer tso, HBaseTransactionManager 
transactionManager) {
+            this.tso = tso;
+            this.transactionManager = transactionManager;
+        }
 
         public void start() {
+
         }
 
         @Override
         public void close() throws IOException {
+            if (transactionManager != null) {
+                transactionManager.close();
+            }
+            if (tso != null) {
+                tso.stopAndWait();
+            }
         }
     }
 
@@ -108,6 +208,6 @@ public class OmidTransactionProvider implements 
PhoenixTransactionProvider {
 
     @Override
     public Put markPutAsCommitted(Put put, long timestamp, long 
commitTimestamp) {
-        return put;
+        return TTable.markPutAsCommitted(put, timestamp, timestamp);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
index 9fbfbc7..d27348d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.transaction;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -44,6 +45,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.transaction.Transaction;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
@@ -51,80 +56,115 @@ import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 
 public class OmidTransactionTable implements Table {
+    // Copied from HBase ProtobufUtil since it's not accessible
+    final static Result EMPTY_RESULT_EXISTS_TRUE = Result.create(null, true);
+
+    private TTable tTable;
+    private Transaction tx;
+    private final boolean addShadowCells;
 
     public OmidTransactionTable() throws SQLException {
+        this.tTable = null;
+        this.tx = null;
+        this.addShadowCells = false;
     }
 
     public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable) 
throws SQLException {
+        this(ctx, hTable, false);
     }
 
     public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, 
boolean isConflictFree) throws SQLException  {
+        this(ctx, hTable, isConflictFree, false);
+    }
+
+    public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, 
boolean isConflictFree, boolean addShadowCells) throws SQLException  {
+        assert(ctx instanceof OmidTransactionContext);
+
+        OmidTransactionContext omidTransactionContext = 
(OmidTransactionContext) ctx;
+        this.addShadowCells = addShadowCells;
+        try {
+            tTable = new TTable(hTable, true, isConflictFree);
+        } catch (IOException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+            .setMessage(e.getMessage()).setRootCause(e).build()
+            .buildException();
+        }
+
+        this.tx = omidTransactionContext.getTransaction();
     }
 
     @Override
     public Result get(Get get) throws IOException {
-        return null;
+        return tTable.get(tx, get);
     }
 
     @Override
     public void put(Put put) throws IOException {
+        tTable.put(tx, put, addShadowCells);
     }
 
     @Override
     public void delete(Delete delete) throws IOException {
+        tTable.delete(tx, delete);
     }
 
     @Override
     public ResultScanner getScanner(Scan scan) throws IOException {
-        return null;
+        scan.setTimeRange(0, Long.MAX_VALUE);
+        return tTable.getScanner(tx, scan);
     }
 
     @Override
     public Configuration getConfiguration() {
-        return null;
+        return tTable.getConfiguration();
     }
 
     @Override
     public HTableDescriptor getTableDescriptor() throws IOException {
-        return null;
+        return tTable.getTableDescriptor();
     }
 
     @Override
     public boolean exists(Get get) throws IOException {
-        return false;
+       return tTable.exists(tx, get);
     }
 
     @Override
     public Result[] get(List<Get> gets) throws IOException {
-        return null;
+        return tTable.get(tx, gets);
     }
 
     @Override
     public ResultScanner getScanner(byte[] family) throws IOException {
-        return null;
+        return tTable.getScanner(tx, family);
     }
 
     @Override
     public ResultScanner getScanner(byte[] family, byte[] qualifier)
             throws IOException {
-        return null;
+        return tTable.getScanner(tx, family, qualifier);
     }
 
     @Override
     public void put(List<Put> puts) throws IOException {
+        tTable.put(tx, puts, addShadowCells);
     }
 
     @Override
     public void delete(List<Delete> deletes) throws IOException {
+        tTable.delete(tx, deletes);
     }
 
     @Override
     public void close() throws IOException {
+        tTable.close();
     }
 
     @Override
     public TableName getName() {
-        return null;
+        byte[] name = tTable.getTableName();
+        return TableName.valueOf(name);
     }
 
     @Override
@@ -135,6 +175,8 @@ public class OmidTransactionTable implements Table {
     @Override
     public void batch(List<? extends Row> actions, Object[] results)
             throws IOException, InterruptedException {
+        tTable.batch(tx, actions, addShadowCells);
+        Arrays.fill(results, EMPTY_RESULT_EXISTS_TRUE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index 3e9182f..0f10b37 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -22,10 +22,11 @@ import java.io.IOException;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 
 
+
 public class TransactionFactory {
     public enum Provider {
         TEPHRA((byte)1, TephraTransactionProvider.getInstance(), true),
-        OMID((byte)2, OmidTransactionProvider.getInstance(), false);
+        OMID((byte)2, OmidTransactionProvider.getInstance(), true);
         
         private final byte code;
         private final PhoenixTransactionProvider provider;
@@ -49,7 +50,7 @@ public class TransactionFactory {
         }
         
         public static Provider getDefault() {
-            return TEPHRA;
+            return OMID;
         }
 
         public PhoenixTransactionProvider getTransactionProvider()  {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 2049390..59e7fd3 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -22,7 +22,6 @@ import static 
org.apache.phoenix.query.QueryServicesOptions.withDefaults;
 
 import org.apache.curator.shaded.com.google.common.io.Files;
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
-import org.apache.phoenix.transaction.OmidTransactionProvider;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/412e0789/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 571aa5c..4f67b98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,7 @@
     <avatica.version>1.12.0</avatica.version>
     <jetty.version>9.3.19.v20170502</jetty.version>
     <tephra.version>0.15.0-incubating</tephra.version>
+    <omid.version>1.0.0</omid.version>
     <spark.version>2.3.0</spark.version>
     <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>

Reply via email to