Repository: phoenix Updated Branches: refs/heads/3.0 bf395def6 -> c3067a754
PHOENIX-1034 Move validate/reserve of sequences into query compile Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c3067a75 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c3067a75 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c3067a75 Branch: refs/heads/3.0 Commit: c3067a754541e9d315960200f0136bf696fd2db5 Parents: bf395de Author: James Taylor <jtay...@salesforce.com> Authored: Sun Jun 8 12:47:53 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Sun Jun 8 12:59:33 2014 -0700 ---------------------------------------------------------------------- .../apache/phoenix/compile/SequenceManager.java | 2 +- .../coprocessor/SequenceRegionObserver.java | 13 ++-- .../phoenix/jdbc/PhoenixPreparedStatement.java | 5 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 62 +++++++++++--------- .../apache/phoenix/parse/BindableStatement.java | 2 - .../apache/phoenix/parse/DeleteStatement.java | 6 -- .../apache/phoenix/parse/ExplainStatement.java | 6 -- .../apache/phoenix/parse/MutableStatement.java | 6 -- .../apache/phoenix/parse/SelectStatement.java | 6 -- .../phoenix/query/ConnectionQueryServices.java | 2 +- .../query/ConnectionQueryServicesImpl.java | 6 +- .../query/ConnectionlessQueryServicesImpl.java | 4 +- .../query/DelegateConnectionQueryServices.java | 2 +- .../org/apache/phoenix/schema/Sequence.java | 15 ++--- .../phoenix/pig/hadoop/PhoenixInputFormat.java | 5 -- 15 files changed, 59 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java index a5f37f8..8e71c3b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java @@ -138,7 +138,7 @@ public class SequenceManager { return expression; } - public void validateSequences(Sequence.Action action) throws SQLException { + public void validateSequences(Sequence.ValueOp action) throws SQLException { if (sequenceMap == null || sequenceMap.isEmpty()) { return; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 46834cf..875bb0c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -67,7 +67,6 @@ import org.apache.phoenix.util.ServerUtil; * @since 3.0.0 */ public class SequenceRegionObserver extends BaseRegionObserver { - public enum Op {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE}; public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION"; public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE"; public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE"; @@ -114,7 +113,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { byte[] cf = entry.getKey(); for (Map.Entry<byte[],Long> kvEntry : entry.getValue().entrySet()) { get.addColumn(cf, kvEntry.getKey()); - validateOnly &= (Sequence.Action.VALIDATE.ordinal() == kvEntry.getValue().intValue()); + validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == kvEntry.getValue().intValue()); } } Result result = region.get(get); @@ -167,7 +166,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { if (opBuf == null) { return null; } - Op op = Op.values()[opBuf[0]]; + Sequence.MetaOp op = Sequence.MetaOp.values()[opBuf[0]]; KeyValue keyValue = append.getFamilyMap().values().iterator().next().iterator().next(); long clientTimestamp = HConstants.LATEST_TIMESTAMP; @@ -175,7 +174,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { long maxGetTimestamp = HConstants.LATEST_TIMESTAMP; boolean hadClientTimestamp; byte[] clientTimestampBuf = null; - if (op == Op.RETURN_SEQUENCE) { + if (op == Sequence.MetaOp.RETURN_SEQUENCE) { // When returning sequences, this allows us to send the expected timestamp // of the sequence to make sure we don't reset any other sequence hadClientTimestamp = true; @@ -191,7 +190,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { // Prevent race condition of creating two sequences at the same timestamp // by looking for a sequence at or after the timestamp at which it'll be // created. - if (op == Op.CREATE_SEQUENCE) { + if (op == Sequence.MetaOp.CREATE_SEQUENCE) { maxGetTimestamp = clientTimestamp + 1; } } else { @@ -218,11 +217,11 @@ public class SequenceRegionObserver extends BaseRegionObserver { get.addColumn(family, qualifier); Result result = region.get(get); if (result.isEmpty()) { - if (op == Op.DROP_SEQUENCE || op == Op.RETURN_SEQUENCE) { + if (op == Sequence.MetaOp.DROP_SEQUENCE || op == Sequence.MetaOp.RETURN_SEQUENCE) { return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode()); } } else { - if (op == Op.CREATE_SEQUENCE) { + if (op == Sequence.MetaOp.CREATE_SEQUENCE) { return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_ALREADY_EXIST.getErrorCode()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java index 7d784dc..7eea568 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java @@ -53,6 +53,7 @@ import org.apache.phoenix.compile.StatementPlan; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.SQLCloseable; @@ -188,7 +189,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar } try { // Just compile top level query without optimizing to get ResultSetMetaData - QueryPlan plan = statement.compilePlan(this); + QueryPlan plan = statement.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE); return new PhoenixResultSetMetaData(this.getConnection(), plan.getProjector()); } finally { int lastSetBit = 0; @@ -211,7 +212,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar } } try { - StatementPlan plan = statement.compilePlan(this); + StatementPlan plan = statement.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE); return plan.getParameterMetaData(); } finally { int lastSetBit = 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 3c27cc9..d4c677b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -33,9 +33,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import com.google.common.base.Throwables; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ColumnResolver; @@ -107,6 +104,7 @@ import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeyValueAccessor; +import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; @@ -118,6 +116,10 @@ import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ServerUtil; +import com.google.common.base.Throwables; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; + /** * @@ -193,7 +195,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException { - QueryPlan plan = stmt.compilePlan(this); + QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE); return connection.getQueryServices().getOptimizer().optimize(this, plan); } @@ -203,10 +205,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @Override public PhoenixResultSet call() throws Exception { try { - QueryPlan plan = stmt.compilePlan(PhoenixStatement.this); + QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); plan = connection.getQueryServices().getOptimizer().optimize( PhoenixStatement.this, plan); - plan.getContext().getSequenceManager().validateSequences(stmt.getSequenceAction()); PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector()); resultSets.add(rs); setLastQueryPlan(plan); @@ -241,8 +242,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho // since they'd update data directly from coprocessors, and should thus operate on // the latest state try { - MutationPlan plan = stmt.compilePlan(PhoenixStatement.this); - plan.getContext().getSequenceManager().validateSequences(stmt.getSequenceAction()); + MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); MutationState state = plan.execute(); connection.getMutationState().join(state); if (connection.getAutoCommit()) { @@ -273,7 +273,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } protected static interface CompilableStatement extends BindableStatement { - public <T extends StatementPlan> T compilePlan (PhoenixStatement stmt) throws SQLException; + public <T extends StatementPlan> T compilePlan (PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException; } private static class ExecutableSelectStatement extends SelectStatement implements CompilableStatement { @@ -284,11 +284,13 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public QueryPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { SelectStatement select = SubselectRewriter.flatten(this, stmt.getConnection()); ColumnResolver resolver = FromCompiler.getResolverForQuery(select, stmt.getConnection()); select = StatementNormalizer.normalize(select, resolver); - return new QueryCompiler(stmt, select, resolver).compile(); + QueryPlan plan = new QueryCompiler(stmt, select, resolver).compile(); + plan.getContext().getSequenceManager().validateSequences(seqAction); + return plan; } } @@ -342,9 +344,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public QueryPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { CompilableStatement compilableStmt = getStatement(); - final StatementPlan plan = compilableStmt.compilePlan(stmt); + final StatementPlan plan = compilableStmt.compilePlan(stmt, Sequence.ValueOp.VALIDATE_SEQUENCE); List<String> planSteps = plan.getExplainPlan().getPlanSteps(); List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size()); for (String planStep : planSteps) { @@ -430,9 +432,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { UpsertCompiler compiler = new UpsertCompiler(stmt); - return compiler.compile(this); + MutationPlan plan = compiler.compile(this); + plan.getContext().getSequenceManager().validateSequences(seqAction); + return plan; } } @@ -443,9 +447,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { DeleteCompiler compiler = new DeleteCompiler(stmt); - return compiler.compile(this); + MutationPlan plan = compiler.compile(this); + plan.getContext().getSequenceManager().validateSequences(seqAction); + return plan; } } @@ -458,7 +464,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { CreateTableCompiler compiler = new CreateTableCompiler(stmt); return compiler.compile(this); } @@ -473,7 +479,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { CreateIndexCompiler compiler = new CreateIndexCompiler(stmt); return compiler.compile(this); } @@ -487,7 +493,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { CreateSequenceCompiler compiler = new CreateSequenceCompiler(stmt); return compiler.compile(this); } @@ -502,7 +508,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { DropSequenceCompiler compiler = new DropSequenceCompiler(stmt); return compiler.compile(this); } @@ -516,7 +522,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { final StatementContext context = new StatementContext(stmt); return new MutationPlan() { @@ -557,7 +563,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { final StatementContext context = new StatementContext(stmt); return new MutationPlan() { @@ -598,7 +604,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { final StatementContext context = new StatementContext(stmt); return new MutationPlan() { @@ -639,7 +645,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { final StatementContext context = new StatementContext(stmt); return new MutationPlan() { @@ -680,7 +686,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho @SuppressWarnings("unchecked") @Override - public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException { + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { final StatementContext context = new StatementContext(stmt); return new MutationPlan() { @@ -898,14 +904,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho if (stmt.getOperation().isMutation()) { throw new ExecuteQueryNotApplicableException(query); } - return stmt.compilePlan(this); + return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE); } public MutationPlan compileMutation(CompilableStatement stmt, String query) throws SQLException { if (!stmt.getOperation().isMutation()) { throw new ExecuteUpdateNotApplicableException(query); } - return stmt.compilePlan(this); + return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE); } public MutationPlan compileMutation(String sql) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java index d9edf75..6594f49 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java @@ -18,11 +18,9 @@ package org.apache.phoenix.parse; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; -import org.apache.phoenix.schema.Sequence; public interface BindableStatement { public int getBindCount(); public Operation getOperation(); - public Sequence.Action getSequenceAction(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java index 077671e..b15b325 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.List; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; -import org.apache.phoenix.schema.Sequence.Action; public class DeleteStatement extends SingleTableStatement implements FilterableStatement { private final ParseNode whereNode; @@ -71,9 +70,4 @@ public class DeleteStatement extends SingleTableStatement implements FilterableS public Operation getOperation() { return Operation.DELETE; } - - @Override - public Action getSequenceAction() { - return Action.RESERVE; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java index 788378e..49ce573 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java @@ -18,7 +18,6 @@ package org.apache.phoenix.parse; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; -import org.apache.phoenix.schema.Sequence.Action; public class ExplainStatement implements BindableStatement { private final BindableStatement statement; @@ -40,9 +39,4 @@ public class ExplainStatement implements BindableStatement { public Operation getOperation() { return Operation.QUERY; } - - @Override - public Action getSequenceAction() { - return Action.VALIDATE; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java index 938ac40..610b2d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java @@ -18,7 +18,6 @@ package org.apache.phoenix.parse; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; -import org.apache.phoenix.schema.Sequence.Action; public abstract class MutableStatement implements BindableStatement { @@ -26,9 +25,4 @@ public abstract class MutableStatement implements BindableStatement { public Operation getOperation() { return Operation.UPSERT; } - - @Override - public Action getSequenceAction() { - return Action.RESERVE; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java index 8caea30..055bc18 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java @@ -25,7 +25,6 @@ import org.apache.phoenix.expression.function.CountAggregateFunction; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionInfo; -import org.apache.phoenix.schema.Sequence.Action; /** * @@ -182,11 +181,6 @@ public class SelectStatement implements FilterableStatement { return Operation.QUERY; } - @Override - public Action getSequenceAction() { - return Action.RESERVE; - } - public boolean isJoin() { return fromTable.size() > 1 || (fromTable.size() > 0 && fromTable.get(0) instanceof JoinTableNode); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 2c7ecbb..035c77b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -88,7 +88,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated long createSequence(String tenantId, String schemaName, String sequenceName, long startWith, long incrementBy, long cacheSize, long timestamp) throws SQLException; long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException; - void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.Action action) throws SQLException; + void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException; void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException; long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException; void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index bf6a450..a9b9a56 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -1583,7 +1583,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * Verifies that sequences exist and reserves values for them if reserveValues is true */ @Override - public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.Action action) throws SQLException { + public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 0, action); } @@ -1600,10 +1600,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement */ @Override public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException { - incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 1, Sequence.Action.RESERVE); + incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, 1, Sequence.ValueOp.RESERVE_SEQUENCE); } - private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, int factor, Sequence.Action action) throws SQLException { + private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[] values, SQLException[] exceptions, int factor, Sequence.ValueOp action) throws SQLException { List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size()); for (SequenceKey key : keys) { Sequence newSequences = new Sequence(key); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 75cfe92..d264d38 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -333,7 +333,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple @Override public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, - SQLException[] exceptions, Sequence.Action action) throws SQLException { + SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { int i = 0; for (SequenceKey key : sequenceKeys) { Long value = sequenceMap.get(key); @@ -401,4 +401,4 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public boolean supportsFeature(Feature feature) { return false; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index ac7c690..cae52dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -184,7 +184,7 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple @Override public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, - SQLException[] exceptions, Sequence.Action action) throws SQLException { + SQLException[] exceptions, Sequence.ValueOp action) throws SQLException { getDelegate().validateSequences(sequenceKeys, timestamp, values, exceptions, action); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java index 4fc985a..19b5569 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java @@ -52,7 +52,8 @@ import com.google.common.collect.Lists; public class Sequence { public static final int SUCCESS = 0; - public enum Action {VALIDATE, RESERVE}; + public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE}; + public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE}; // Pre-compute index of sequence key values to prevent binary search private static final KeyValue CURRENT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SEQUENCE_FAMILY_BYTES, CURRENT_VALUE_BYTES); @@ -119,13 +120,13 @@ public class Sequence { return value.isDeleted() ? null : value; } - public long incrementValue(long timestamp, int factor, Action action) throws EmptySequenceCacheException { + public long incrementValue(long timestamp, int factor, ValueOp action) throws EmptySequenceCacheException { SequenceValue value = findSequenceValue(timestamp); if (value == null) { throw EMPTY_SEQUENCE_CACHE_EXCEPTION; } if (value.currentValue == value.nextValue) { - if (action == Action.VALIDATE) { + if (action == ValueOp.VALIDATE_SEQUENCE) { return value.currentValue; } throw EMPTY_SEQUENCE_CACHE_EXCEPTION; @@ -162,7 +163,7 @@ public class Sequence { private Append newReturn(SequenceValue value) { byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), this.key.getSchemaName(), this.key.getSequenceName()); Append append = new Append(key); - byte[] opBuf = new byte[] {(byte)SequenceRegionObserver.Op.RETURN_SEQUENCE.ordinal()}; + byte[] opBuf = new byte[] {(byte)MetaOp.RETURN_SEQUENCE.ordinal()}; append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, opBuf); append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, PDataType.LONG.toBytes(value.nextValue)); Map<byte[], List<KeyValue>> familyMap = append.getFamilyMap(); @@ -216,7 +217,7 @@ public class Sequence { return currentValue; } - public Increment newIncrement(long timestamp, Sequence.Action action) { + public Increment newIncrement(long timestamp, Sequence.ValueOp action) { Increment inc = new Increment(SchemaUtil.getSequenceKey(key.getTenantId(), key.getSchemaName(), key.getSequenceName())); // It doesn't matter what we set the amount too - we always use the values we get // from the Get we do to prevent any race conditions. All columns that get added @@ -345,7 +346,7 @@ public class Sequence { public Append createSequence(long startWith, long incrementBy, long cacheSize, long timestamp) { byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), this.key.getSchemaName(), this.key.getSequenceName()); Append append = new Append(key); - append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new byte[] {(byte)SequenceRegionObserver.Op.CREATE_SEQUENCE.ordinal()}); + append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new byte[] {(byte)MetaOp.CREATE_SEQUENCE.ordinal()}); if (timestamp != HConstants.LATEST_TIMESTAMP) { append.setAttribute(SequenceRegionObserver.MAX_TIMERANGE_ATTRIB, Bytes.toBytes(timestamp)); } @@ -380,7 +381,7 @@ public class Sequence { public Append dropSequence(long timestamp) { byte[] key = SchemaUtil.getSequenceKey(this.key.getTenantId(), this.key.getSchemaName(), this.key.getSequenceName()); Append append = new Append(key); - append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new byte[] {(byte)SequenceRegionObserver.Op.DROP_SEQUENCE.ordinal()}); + append.setAttribute(SequenceRegionObserver.OPERATION_ATTRIB, new byte[] {(byte)MetaOp.DROP_SEQUENCE.ordinal()}); if (timestamp != HConstants.LATEST_TIMESTAMP) { append.setAttribute(SequenceRegionObserver.MAX_TIMERANGE_ATTRIB, Bytes.toBytes(timestamp)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c3067a75/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java index b2ac2f2..ebb9023 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java @@ -157,11 +157,6 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); this.queryPlan = pstmt.compileQuery(selectStatement); - // TODO: absorb this call to validate/reserve sequences into an existing API or - // create a new API on QueryPlan called reserveSequences(). - // We don't want to do it during compile because compile gets called multiple times, - // which would reserve sequences more than once. - queryPlan.getContext().getSequenceManager().validateSequences(queryPlan.getStatement().getSequenceAction()); } catch(Exception exception) { LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage())); throw new RuntimeException(exception);