Repository: phoenix Updated Branches: refs/heads/txn 433495482 -> 714284d32
Fix TupleProjector to call getTableName instead of getName, make equality of TableRef ignore alias if either is null, cache Transaction on context for querying Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/714284d3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/714284d3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/714284d3 Branch: refs/heads/txn Commit: 714284d32a4937015e97e6efdb0556b5990313f2 Parents: 4334954 Author: James Taylor <jtay...@salesforce.com> Authored: Sun Apr 19 23:12:50 2015 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Sun Apr 19 23:12:50 2015 -0700 ---------------------------------------------------------------------- .../phoenix/transactions/TransactionIT.java | 59 ++++++-- .../phoenix/compile/StatementContext.java | 11 ++ .../compile/TupleProjectionCompiler.java | 2 +- .../apache/phoenix/compile/UpsertCompiler.java | 9 ++ .../apache/phoenix/execute/CommitException.java | 9 +- .../apache/phoenix/execute/MutationState.java | 138 +++++++++++-------- .../phoenix/iterate/TableResultIterator.java | 6 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 27 ++-- .../query/ConnectionQueryServicesImpl.java | 1 - .../org/apache/phoenix/schema/TableRef.java | 8 +- .../java/org/apache/phoenix/query/BaseTest.java | 2 - 11 files changed, 171 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java index 807234d..31adcb9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/transactions/TransactionIT.java @@ -25,15 +25,11 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Map; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; +import co.cask.tephra.TxConstants; + import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -42,19 +38,43 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import co.cask.tephra.TxConstants; - import com.google.common.collect.Maps; public class TransactionIT extends BaseHBaseManagedTimeIT { private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE; - @Before + @Before public void setUp() throws SQLException { - ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE); + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.createStatement().execute( + "create table "+ FULL_TABLE_NAME + "(" + + " varchar_pk VARCHAR NOT NULL, " + + " char_pk CHAR(6) NOT NULL, " + + " int_pk INTEGER NOT NULL, " + + " long_pk BIGINT NOT NULL, " + + " decimal_pk DECIMAL(31, 10) NOT NULL, " + + " date_pk DATE NOT NULL, " + + " a.varchar_col1 VARCHAR, " + + " a.char_col1 CHAR(10), " + + " a.int_col1 INTEGER, " + + " a.long_col1 BIGINT, " + + " a.decimal_col1 DECIMAL(31, 10), " + + " a.date1 DATE, " + + " b.varchar_col2 VARCHAR, " + + " b.char_col2 CHAR(10), " + + " b.int_col2 INTEGER, " + + " b.long_col2 BIGINT, " + + " b.decimal_col2 DECIMAL(31, 10), " + + " b.date2 DATE " + + " CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) " + + "TRANSACTIONAL=true"); + } finally { + conn.close(); + } } - + @BeforeClass @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) public static void doSetup() throws Exception { @@ -122,18 +142,31 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } @Test - public void testAutocommitQueryEmptyTable() throws Exception { + public void testAutoCommitQuerySingleTable() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); try { conn.setAutoCommit(true); // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM "+FULL_TABLE_NAME); + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME); assertFalse(rs.next()); } finally { conn.close(); } } + @Test + public void testAutoCommitQueryMultiTables() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + try { + conn.setAutoCommit(true); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)"); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + @Test public void testColConflicts() throws Exception { Connection conn1 = DriverManager.getConnection(getUrl()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index d726488..08ba1b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Set; import java.util.TimeZone; +import co.cask.tephra.Transaction; + import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; @@ -78,6 +80,7 @@ public class StatementContext { private TableRef currentTable; private List<Pair<byte[], byte[]>> whereConditionColumns; private TimeRange scanTimeRange = null; + private Transaction transaction; private Map<SelectStatement, Object> subqueryResults; @@ -285,4 +288,12 @@ public class StatementContext { public void setSubqueryResult(SelectStatement select, Object result) { subqueryResults.put(select, result); } + + public Transaction getTransaction() { + return transaction; + } + + public void setTransaction(Transaction transaction) { + this.transaction = transaction; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index b41107b..d15ee7f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -146,7 +146,7 @@ public class TupleProjectionCompiler { projectedColumns.add(column); } - return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getName(), PTableType.PROJECTED, + return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getTableName(), PTableType.PROJECTED, table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), projectedColumns, table.getParentSchemaName(), table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 67d289e..8e38ffc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -477,6 +477,7 @@ public class UpsertCompiler { break; } + final QueryPlan originalQueryPlan = queryPlanToBe; RowProjector projectorToBe = null; // Optimize only after all checks have been performed if (valueNodes == null) { @@ -674,6 +675,14 @@ public class UpsertCompiler { @Override public MutationState execute() throws SQLException { + // Repeated from PhoenixStatement.executeQuery which this call bypasses. + // Send mutations to hbase, so they are visible to subsequent reads. + // Use original plan for data table so that data and immutable indexes will be sent. + boolean isTransactional = connection.getMutationState().startTransaction(originalQueryPlan.getContext().getResolver().getTables().iterator()); + if (isTransactional) { + // Use real query plan so that we have the right context object. + queryPlan.getContext().setTransaction(connection.getMutationState().getTransaction()); + } ResultIterator iterator = queryPlan.iterator(); if (parallelIteratorFactory == null) { return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes); http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java index 63bf6a1..6cbb06d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java @@ -22,20 +22,13 @@ import java.sql.SQLException; public class CommitException extends SQLException { private static final long serialVersionUID = 1L; private final MutationState uncommittedState; - private final MutationState committedState; - public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) { + public CommitException(Exception e, MutationState uncommittedState) { super(e); this.uncommittedState = uncommittedState; - this.committedState = committedState; } public MutationState getUncommittedState() { return uncommittedState; } - - public MutationState getCommittedState() { - return committedState; - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index a0cf8d2..e2b6968 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -72,6 +72,7 @@ import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -140,20 +141,6 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - private MutationState(MutationState state, List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries) { - this.maxSize = state.maxSize; - this.connection = state.connection; - this.sizeOffset = state.sizeOffset; - this.tx = state.tx; - this.txAwares = state.txAwares; - this.txContext = state.txContext; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) { - numRows += entry.getValue().size(); - this.mutations.put(entry.getKey(), entry.getValue()); - } - throwIfTooBig(); - } - private void addTxParticipant(TransactionAware txAware) throws SQLException { if (txContext == null) { txAwares.add(txAware); @@ -168,7 +155,7 @@ public class MutationState implements SQLCloseable { return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; } - public void startTransaction() throws SQLException { + public boolean startTransaction() throws SQLException { if (txContext == null) { throw new SQLException("No transaction context"); // TODO: error code } @@ -177,10 +164,12 @@ public class MutationState implements SQLCloseable { if (!txStarted) { txContext.start(); txStarted = true; + return true; } } catch (TransactionFailureException e) { throw new SQLException(e); // TODO: error code } + return false; } private void throwIfTooBig() { @@ -372,46 +361,50 @@ public class MutationState implements SQLCloseable { * @return the server time to use for the upsert * @throws SQLException if the table or any columns no longer exist */ - private long[] validate() throws SQLException { + private long[] validateAll() throws SQLException { int i = 0; - Long scn = connection.getSCN(); - MetaDataClient client = new MetaDataClient(connection); long[] timeStamps = new long[this.mutations.size()]; for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) { TableRef tableRef = entry.getKey(); - long serverTimeStamp = tableRef.getTimeStamp(); - PTable table = tableRef.getTable(); - // If we're auto committing, we've already validated the schema when we got the ColumnResolver, - // so no need to do it again here. - if (!connection.getAutoCommit()) { - MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); - long timestamp = result.getMutationTime(); - if (timestamp != QueryConstants.UNSET_TIMESTAMP) { - serverTimeStamp = timestamp; - if (result.wasUpdated()) { - // TODO: use bitset? - table = result.getTable(); - PColumn[] columns = new PColumn[table.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) { - Map<PColumn,byte[]> valueEntry = rowEntry.getValue(); - if (valueEntry != PRow.DELETE_MARKER) { - for (PColumn column : valueEntry.keySet()) { - columns[column.getPosition()] = column; - } + timeStamps[i++] = validate(tableRef, entry.getValue()); + } + return timeStamps; + } + + private long validate(TableRef tableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> values) throws SQLException { + Long scn = connection.getSCN(); + MetaDataClient client = new MetaDataClient(connection); + long serverTimeStamp = tableRef.getTimeStamp(); + PTable table = tableRef.getTable(); + // If we're auto committing, we've already validated the schema when we got the ColumnResolver, + // so no need to do it again here. + if (!connection.getAutoCommit()) { + MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); + long timestamp = result.getMutationTime(); + if (timestamp != QueryConstants.UNSET_TIMESTAMP) { + serverTimeStamp = timestamp; + if (result.wasUpdated()) { + // TODO: use bitset? + table = result.getTable(); + PColumn[] columns = new PColumn[table.getColumns().size()]; + for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : values.entrySet()) { + Map<PColumn,byte[]> valueEntry = rowEntry.getValue(); + if (valueEntry != PRow.DELETE_MARKER) { + for (PColumn column : valueEntry.keySet()) { + columns[column.getPosition()] = column; } } - for (PColumn column : columns) { - if (column != null) { - table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); - } + } + for (PColumn column : columns) { + if (column != null) { + table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); } - tableRef.setTable(table); } + tableRef.setTable(table); } } - timeStamps[i++] = scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; } - return timeStamps; + return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp : scn; } private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) { @@ -429,25 +422,31 @@ public class MutationState implements SQLCloseable { } @SuppressWarnings("deprecation") - public void send() throws SQLException { + private void send(Iterator<TableRef> tableRefs) throws SQLException { int i = 0; + long[] serverTimeStamps = null; byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); - long[] serverTimeStamps = validate(); - Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); - List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size()); + // Validate up front if not transactional so that we + if (tableRefs == null) { + serverTimeStamps = validateAll(); + tableRefs = mutations.keySet().iterator(); + } // add tracing for this operation TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); Span span = trace.getSpan(); - while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next(); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue(); - TableRef tableRef = entry.getKey(); + while (tableRefs.hasNext()) { + TableRef tableRef = tableRefs.next(); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = mutations.get(tableRef); + if (valuesMap == null) { + continue; + } PTable table = tableRef.getTable(); table.getIndexMaintainers(tempPtr, connection); boolean hasIndexMaintainers = tempPtr.getLength() > 0; boolean isDataTable = true; - long serverTimestamp = serverTimeStamps[i++]; + // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) + long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); while (mutationsIterator.hasNext()) { Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); @@ -518,7 +517,6 @@ public class MutationState implements SQLCloseable { MUTATION_COMMIT_TIME.update(duration); shouldRetry = false; if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection)); - committedList.add(entry); } catch (Exception e) { SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); if (inferredE != null) { @@ -541,7 +539,7 @@ public class MutationState implements SQLCloseable { } // Throw to client with both what was committed so far and what is left to be committed. // That way, client can either undo what was done or try again with what was not done. - sqlE = new CommitException(e, this, new MutationState(this, committedList)); + sqlE = new CommitException(e, this); } finally { try { if (cache != null) { @@ -567,9 +565,9 @@ public class MutationState implements SQLCloseable { isDataTable = false; } if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= entry.getValue().size(); + numRows -= valuesMap.size(); } - iterator.remove(); // Remove batches as we process them + valuesMap.remove(tableRef); // Remove batches as we process them } trace.close(); assert(numRows==0); @@ -624,4 +622,30 @@ public class MutationState implements SQLCloseable { } } } + + /** + * Send mutations to hbase, so they are visible to subsequent reads, + * starting a transaction if transactional and one has not yet been started. + * @param tableRefs + * @return true if at least partially transactional and false otherwise. + * @throws SQLException + */ + public boolean startTransaction(Iterator<TableRef> tableRefs) throws SQLException { + Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){ + @Override + public boolean apply(TableRef tableRef) { + return tableRef.getTable().isTransactional(); + } + }); + if (filteredTableRefs.hasNext()) { + startTransaction(); + send(filteredTableRefs); + return true; + } + return false; + } + + public void send() throws SQLException { + send(null); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 9cece1c..ad95ef9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -87,9 +87,11 @@ public class TableResultIterator extends ExplainTable implements ResultIterator this.scan = scan; PTable table = tableRef.getTable(); HTableInterface htable = context.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); - if (table.isTransactional()) { + Transaction tx; + if (table.isTransactional() && (tx=context.getTransaction()) != null) { TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable); - Transaction tx = context.getConnection().getMutationState().getTransaction(); + // Use transaction cached on context as we may have started a new transaction already + // if auto commit is true. txAware.startTx(tx); htable = txAware; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index fb295d3..ef2a233 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -234,19 +234,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho final long startTime = System.currentTimeMillis(); try { QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); - startTransaction(plan); - plan = connection.getQueryServices().getOptimizer().optimize( - PhoenixStatement.this, plan); + // Send mutations to hbase, so they are visible to subsequent reads. + // Use original plan for data table so that data and immutable indexes will be sent + boolean isTransactional = connection.getMutationState().startTransaction(plan.getContext().getResolver().getTables().iterator()); + plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan); + if (isTransactional) { + // After optimize so that we have the right context object + plan.getContext().setTransaction(connection.getMutationState().getTransaction()); + } // this will create its own trace internally, so we don't wrap this // whole thing in tracing ResultIterator resultIterator = plan.iterator(); - if (connection.getAutoCommit()) { - connection.commit(); // Forces new read point for next statement - } - else { - // send mutations to hbase, so they are visible to subsequent reads - connection.getMutationState().send(); - } if (logger.isDebugEnabled()) { String explainPlan = QueryUtil.getExplainPlan(resultIterator); logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection)); @@ -257,6 +255,10 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho setLastResultSet(rs); setLastUpdateCount(NO_UPDATE); setLastUpdateOperation(stmt.getOperation()); + // If transactional, this will move the read pointer forward + if (connection.getAutoCommit()) { + connection.commit(); + } return rs; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException @@ -279,13 +281,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } } - public void startTransaction(StatementPlan plan) throws SQLException { + private boolean startTransaction(StatementPlan plan) throws SQLException { for (TableRef ref : plan.getContext().getResolver().getTables()) { if (ref.getTable().isTransactional()) { connection.getMutationState().startTransaction(); - break; + return true; } } + return false; } protected int executeMutation(final CompilableStatement stmt) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 01cf8b2..1655a57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2069,7 +2069,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public MutationState updateData(MutationPlan plan) throws SQLException { - plan.getContext().getStatement().startTransaction(plan); return plan.execute(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java index bd88770..b5351bf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java @@ -97,10 +97,7 @@ public class TableRef { @Override public int hashCode() { - final int prime = 31; - int result = alias == null ? 0 : alias.hashCode(); - result = prime * result + this.table.getName().getString().hashCode(); - return result; + return this.table.getName().getString().hashCode(); } @Override @@ -109,7 +106,8 @@ public class TableRef { if (obj == null) return false; if (getClass() != obj.getClass()) return false; TableRef other = (TableRef)obj; - if ((alias == null && other.alias != null) || (alias != null && !alias.equals(other.alias))) return false; + // If alias is null, it matches any other alias + if (alias != null && other.alias != null && !alias.equals(other.alias)) return false; if (!table.getName().getString().equals(other.table.getName().getString())) return false; return true; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/714284d3/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 03f7d0f..4ab438b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -81,7 +81,6 @@ import static org.apache.phoenix.util.TestUtil.STABLE_NAME; import static org.apache.phoenix.util.TestUtil.TABLE_WITH_ARRAY; import static org.apache.phoenix.util.TestUtil.TABLE_WITH_SALTING; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -418,7 +417,6 @@ public abstract class BaseTest { " kv bigint)\n"); builder.put(INDEX_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + INDEX_DATA_TABLE + TEST_TABLE_SCHEMA + "IMMUTABLE_ROWS=true"); builder.put(MUTABLE_INDEX_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE + TEST_TABLE_SCHEMA); - builder.put(TRANSACTIONAL_DATA_TABLE, "create table " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE + TEST_TABLE_SCHEMA + "TRANSACTIONAL=true"); builder.put("SumDoubleTest","create table SumDoubleTest" + " (id varchar not null primary key, d DOUBLE, f FLOAT, ud UNSIGNED_DOUBLE, uf UNSIGNED_FLOAT, i integer, de decimal)"); builder.put(JOIN_ORDER_TABLE_FULL_NAME, "create table " + JOIN_ORDER_TABLE_FULL_NAME +