fix some more bugs and clean code
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f5f86341 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f5f86341 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f5f86341 Branch: refs/heads/omid Commit: f5f86341a7eb4a721a3e924817875ed7c676e6a0 Parents: f584e5f Author: Ohad Shacham <[email protected]> Authored: Sun Apr 9 17:36:10 2017 +0300 Committer: Ohad Shacham <[email protected]> Committed: Sun Apr 9 17:36:10 2017 +0300 ---------------------------------------------------------------------- .../phoenix/tx/FlappingTransactionIT.java | 3 - .../org/apache/phoenix/tx/TransactionIT.java | 16 +- .../org/apache/phoenix/tx/TxCheckpointIT.java | 14 +- .../coprocessor/BaseScannerRegionObserver.java | 1 - .../PhoenixTransactionalProcessor.java | 4 +- .../phoenix/coprocessor/ScanRegionObserver.java | 1 - .../UngroupedAggregateRegionObserver.java | 6 +- .../apache/phoenix/execute/MutationState.java | 3 + .../apache/phoenix/jdbc/PhoenixConnection.java | 1 - .../apache/phoenix/schema/MetaDataClient.java | 4 +- .../transaction/OmidTransactionContext.java | 13 ++ .../transaction/PhoenixTransactionContext.java | 20 +- .../transaction/TephraTransactionContext.java | 224 +++++++++---------- .../java/org/apache/phoenix/util/IndexUtil.java | 4 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 4 +- .../apache/phoenix/util/TransactionUtil.java | 1 - 16 files changed, 166 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java index d34f403..0bc7c24 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java @@ -47,10 +47,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionalTable; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.TransactionAwareHTable; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 1399f6c..ff2bf6b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -53,11 +53,11 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; -import org.apache.tephra.TxConstants; import org.junit.Ignore; import org.junit.Test; @@ -374,21 +374,21 @@ public class TransactionIT extends ParallelStatsDisabledIT { for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + assertEquals(1000, Integer.parseInt(colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL))); } desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + assertEquals(1000, Integer.parseInt(colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL))); } desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + assertEquals(1000, Integer.parseInt(colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL))); } conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)"); @@ -397,14 +397,14 @@ public class TransactionIT extends ParallelStatsDisabledIT { for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(10, colDesc.getMaxVersions()); assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive()); - assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL)); + assertEquals(null, colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TTL=1000"); desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2")); for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(10, colDesc.getMaxVersions()); assertEquals(1000, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + assertEquals(1000, Integer.parseInt(colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL))); } conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "3(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)"); @@ -434,7 +434,7 @@ public class TransactionIT extends ParallelStatsDisabledIT { for (HColumnDescriptor colDesc : desc.getFamilies()) { assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions()); assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive()); - assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL))); + assertEquals(1000, Integer.parseInt(colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL))); } } @@ -466,7 +466,7 @@ public class TransactionIT extends ParallelStatsDisabledIT { admin.createTable(desc); ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; conn.createStatement().execute(ddl); - assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); + assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(PhoenixTransactionContext.READ_NON_TX_DATA)); // Should be ok, as HBase metadata should match existing metadata. ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 246ecd4..aac9586 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -36,9 +36,9 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; -import org.apache.tephra.Transaction.VisibilityLevel; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -254,7 +254,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { long wp = state.getWritePointer(); conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + ""); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertEquals(wp, state.getWritePointer()); // Make sure write ptr // didn't move @@ -266,7 +266,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a5', 'b5' from " + fullTableName + ""); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr // moves @@ -279,7 +279,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { conn.createStatement().execute( "upsert into " + fullTableName + " select max(id)+1, 'a6', 'b6' from " + fullTableName + ""); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr // moves @@ -318,7 +318,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { state.startTransaction(); long wp = state.getWritePointer(); conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1"); - assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel()); assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); @@ -336,7 +336,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); @@ -353,7 +353,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 4)"); conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))"); - assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); + assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel()); assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 321d117..705af86 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -58,7 +58,6 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -import org.apache.tephra.Transaction; import com.google.common.collect.ImmutableList; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java index 8693681..37fa2ab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java @@ -17,12 +17,12 @@ */ package org.apache.phoenix.coprocessor; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.phoenix.transaction.TransactionFactory; public class PhoenixTransactionalProcessor extends DelegateRegionObserver { public PhoenixTransactionalProcessor() { - super(new TransactionProcessor()); + super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 0e0e3ba..6f7198e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -64,7 +64,6 @@ import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -import org.apache.tephra.Transaction; import com.google.common.collect.Lists; import com.google.common.collect.Sets; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index db3c792..9e4f39d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -108,6 +108,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -116,7 +117,6 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TimeKeeper; -import org.apache.tephra.TxConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -560,7 +560,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver firstKV.getRowOffset(), firstKV.getRowLength(),ts); mutations.add(delete); // force tephra to ignore this deletes - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); } else if (isUpsert) { Arrays.fill(values, null); int bucketNumOffset = 0; @@ -624,7 +624,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver results.get(0).getRowLength()); delete.deleteColumns(deleteCF, deleteCQ, ts); // force tephra to ignore this deletes - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); mutations.add(delete); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 23c8b2a..8e26bdc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -282,6 +282,9 @@ public class MutationState implements SQLCloseable { phoenixTransactionContext.checkpoint(hasUncommittedData); + if (hasUncommittedData) { + uncommittedPhysicalNames.clear(); + } return true; } return false; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index d387ab7..288277f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -113,7 +113,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.SchemaUtil; -import org.apache.tephra.TransactionContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index cc2b5b9..42d1431 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -199,6 +199,7 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PUnsignedLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; @@ -211,7 +212,6 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TransactionUtil; import org.apache.phoenix.util.UpgradeUtil; -import org.apache.tephra.TxConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1847,7 +1847,7 @@ public class MetaDataClient { // If TTL set, use Tephra TTL property name instead Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL); if (ttl != null) { - commonFamilyProps.put(TxConstants.PROPERTY_TTL, ttl); + commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, ttl); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 8a4e284..d122d0c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -3,6 +3,7 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.phoenix.schema.PTable; import org.slf4j.Logger; @@ -103,4 +104,16 @@ public class OmidTransactionContext implements PhoenixTransactionContext { // TODO Auto-generated method stub return 0; } + + @Override + public boolean isPreExistingVersion(long version) { + // TODO Auto-generated method stub + return false; + } + + @Override + public BaseRegionObserver getCoProcessor() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index bd63930..0854f4e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -1,5 +1,6 @@ package org.apache.phoenix.transaction; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.phoenix.schema.PTable; import org.slf4j.Logger; @@ -19,7 +20,11 @@ public interface PhoenixTransactionContext { SNAPSHOT_ALL } - public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "phoenix.tx.rollback"; + public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "tephra.tx.rollback"; //"phoenix.tx.rollback"; + + public static final String PROPERTY_TTL = "dataset.table.ttl"; + + public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing"; /** * Starts a transaction @@ -120,4 +125,17 @@ public interface PhoenixTransactionContext { * @return max transactions per second */ public long getMaxTransactionsPerSecond(); + + /** + * + * @param version + * @return + */ + public boolean isPreExistingVersion(long version); + + /** + * + * @return the coprocessor + */ + public BaseRegionObserver getCoProcessor(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index cfa3ac3..a5e6e64 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -25,7 +26,9 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.util.TxUtils; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; @@ -51,19 +54,23 @@ public class TephraTransactionContext implements PhoenixTransactionContext { public TephraTransactionContext(byte[] txnBytes) throws IOException { this(); - this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC.decode(txnBytes) : null; + this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC + .decode(txnBytes) : null; } public TephraTransactionContext(PhoenixConnection connection) { - this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); + this.txServiceClient = connection.getQueryServices() + .getTransactionSystemClient(); this.txAwares = Collections.emptyList(); this.txContext = new TransactionContext(txServiceClient); } - public TephraTransactionContext(PhoenixTransactionContext ctx, PhoenixConnection connection, boolean subTask) { - this.txServiceClient = connection.getQueryServices().getTransactionSystemClient(); + public TephraTransactionContext(PhoenixTransactionContext ctx, + PhoenixConnection connection, boolean subTask) { + this.txServiceClient = connection.getQueryServices() + .getTransactionSystemClient(); - assert(ctx instanceof TephraTransactionContext); + assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; if (subTask) { @@ -81,51 +88,53 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public void begin() throws SQLException { if (txContext == null) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build() + .buildException(); } - System.out.println("BEGIN"); try { txContext.start(); } catch (TransactionFailureException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); } } @Override public void commit() throws SQLException { - + if (txContext == null || !isTransactionRunning()) { return; } - + try { txContext.finish(); } catch (TransactionFailureException e) { this.e = e; + if (e instanceof TransactionConflictException) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); } - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); } } @Override public void abort() throws SQLException { - + if (txContext == null || !isTransactionRunning()) { return; } - + try { if (e != null) { txContext.abort(e); @@ -135,10 +144,10 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } } catch (TransactionFailureException e) { this.e = null; - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED) - .setMessage(e.getMessage()) - .setRootCause(e) - .build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); } } @@ -148,8 +157,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext { try { if (txContext == null) { tx = txServiceClient.checkpoint(tx); - } else { - assert(txContext != null); + } else { + assert (txContext != null); txContext.checkpoint(); tx = txContext.getCurrentTransaction(); } @@ -159,44 +168,43 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } // Since we're querying our own table while mutating it, we must exclude - // see our current mutations, otherwise we can get erroneous results (for DELETE) + // see our current mutations, otherwise we can get erroneous results + // (for DELETE) // or get into an infinite loop (for UPSERT SELECT). if (txContext == null) { tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } else { + assert (txContext != null); + txContext.getCurrentTransaction().setVisibility( + VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); } - else { - assert(txContext != null); - txContext.getCurrentTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); - } - } - - private Transaction getCurrentTransaction() { - if (this.txContext != null) { - return this.txContext.getCurrentTransaction(); - } - - return this.tx; } @Override - public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { + public void commitDDLFence(PTable dataTable, Logger logger) + throws SQLException { byte[] key = dataTable.getName().getBytes(); try { - FenceWait fenceWait = VisibilityFence.prepareWait(key, txServiceClient); + FenceWait fenceWait = VisibilityFence.prepareWait(key, + txServiceClient); fenceWait.await(10000, TimeUnit.MILLISECONDS); - + if (logger.isInfoEnabled()) { - logger.info("Added write fence at ~" + getCurrentTransaction().getReadPointer()); + logger.info("Added write fence at ~" + + getCurrentTransaction().getReadPointer()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e) + .build().buildException(); } catch (TimeoutException | TransactionFailureException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) - .setSchemaName(dataTable.getSchemaName().getString()) - .setTableName(dataTable.getTableName().getString()) - .build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) + .setSchemaName(dataTable.getSchemaName().getString()) + .setTableName(dataTable.getTableName().getString()).build() + .buildException(); } } @@ -213,7 +221,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext { byte[] physicalKey = table.getPhysicalName().getBytes(); if (Bytes.compareTo(physicalKey, logicalKey) != 0) { - TransactionAware physicalTxAware = VisibilityFence.create(physicalKey); + TransactionAware physicalTxAware = VisibilityFence + .create(physicalKey); if (this.txContext == null) { this.txAwares.add(physicalTxAware); } else { @@ -224,7 +233,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public void join(PhoenixTransactionContext ctx) { - assert(ctx instanceof TephraTransactionContext); + assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; if (txContext != null) { @@ -236,70 +245,51 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } } + private Transaction getCurrentTransaction() { + return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; + } + @Override public boolean isTransactionRunning() { - if (this.txContext != null) { - return (this.txContext.getCurrentTransaction() != null); - } - - if (this.tx != null) { - return true; - } - - return false; + return getCurrentTransaction() != null; } @Override public void reset() { tx = null; txAwares.clear(); + this.e = null; } @Override public long getTransactionId() { - if (this.txContext != null) { - return txContext.getCurrentTransaction().getTransactionId(); - } - - if (tx != null) { - return tx.getTransactionId(); - } - - return HConstants.LATEST_TIMESTAMP; + Transaction tx = getCurrentTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing } @Override public long getReadPointer() { - if (this.txContext != null) { - return txContext.getCurrentTransaction().getReadPointer(); - } + Transaction tx = getCurrentTransaction(); - if (tx != null) { - return tx.getReadPointer(); + if (tx == null) { + return (-1); } - return (-1); + return tx.getReadPointer(); } // For testing @Override public long getWritePointer() { - if (this.txContext != null) { - return txContext.getCurrentTransaction().getWritePointer(); - } - - if (tx != null) { - return tx.getWritePointer(); - } - - return HConstants.LATEST_TIMESTAMP; + Transaction tx = getCurrentTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); } @Override public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { VisibilityLevel tephraVisibilityLevel = null; - switch(visibilityLevel) { + switch (visibilityLevel) { case SNAPSHOT: tephraVisibilityLevel = VisibilityLevel.SNAPSHOT; break; @@ -307,34 +297,29 @@ public class TephraTransactionContext implements PhoenixTransactionContext { tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; break; case SNAPSHOT_ALL: + System.out.println("OHAD Move to SNAPSHOT_ALL "); + System.out.flush(); tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL; break; default: - assert(false); + assert (false); } - if (this.txContext != null) { - txContext.getCurrentTransaction().setVisibility(tephraVisibilityLevel); - } else if (tx != null) { - tx.setVisibility(tephraVisibilityLevel); - } else { - assert(false); - } + Transaction tx = getCurrentTransaction(); + assert(tx != null); + tx.setVisibility(tephraVisibilityLevel); } - - // For testing + @Override public PhoenixVisibilityLevel getVisibilityLevel() { VisibilityLevel visibilityLevel = null; - if (this.txContext != null) { - visibilityLevel = txContext.getCurrentTransaction().getVisibilityLevel(); - } else if (tx != null) { - visibilityLevel = tx.getVisibilityLevel(); - } + Transaction tx = getCurrentTransaction(); + assert(tx != null); + visibilityLevel = tx.getVisibilityLevel(); PhoenixVisibilityLevel phoenixVisibilityLevel; - switch(visibilityLevel) { + switch (visibilityLevel) { case SNAPSHOT: phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT; break; @@ -352,36 +337,37 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public byte[] encodeTransaction() throws SQLException { - - Transaction transaction = null; - - if (this.txContext != null) { - transaction = txContext.getCurrentTransaction(); - } else if (tx != null) { - transaction = tx; - } - - assert (transaction != null); + Transaction tx = getCurrentTransaction(); + assert (tx != null); try { - return CODEC.encode(transaction); + return CODEC.encode(tx); } catch (IOException e) { throw new SQLException(e); } } - + @Override public long getMaxTransactionsPerSecond() { return TxConstants.MAX_TX_PER_MS; } + @Override + public boolean isPreExistingVersion(long version) { + return TxUtils.isPreExistingVersion(version); + } + + @Override + public BaseRegionObserver getCoProcessor() { + return new TransactionProcessor(); + } /** - * TephraTransactionContext specific functions - */ + * TephraTransactionContext specific functions + */ Transaction getTransaction() { - return this.tx; + return this.getCurrentTransaction(); } TransactionContext getContext() { @@ -397,7 +383,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { txContext.addTransactionAware(txAware); } else if (this.tx != null) { txAwares.add(txAware); - assert(tx != null); + assert (tx != null); txAware.startTx(tx); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 4a9cb57..2bab0b0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -104,7 +104,7 @@ import org.apache.phoenix.schema.types.PDecimal; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; -import org.apache.tephra.TxConstants; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; @@ -261,7 +261,7 @@ public class IndexUtil { regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); } Delete delete = maintainer.buildDeleteMutation(kvBuilder, null, ptr, Collections.<KeyValue>emptyList(), ts, regionStartKey, regionEndkey); - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY)); + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY)); indexMutations.add(delete); } return indexMutations; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 5bfb55d..ca286d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -85,7 +85,7 @@ import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.types.PDataType; -import org.apache.tephra.util.TxUtils; +import org.apache.phoenix.transaction.TransactionFactory; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -1433,7 +1433,7 @@ public class PhoenixRuntime { * @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp. */ public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) { - return TxUtils.isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); + return TransactionFactory.getTransactionFactory().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); } public static long getCurrentScn(ReadOnlyProps props) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f5f86341/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 94a56b8..0a55147 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -33,7 +33,6 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionalTable; import org.apache.phoenix.transaction.TephraTransactionTable; import org.apache.phoenix.transaction.TransactionFactory; -import org.apache.tephra.TxConstants; public class TransactionUtil { private TransactionUtil() {
