PHOENIX-1954 Reserve chunks of numbers for a sequence (Jan Fernando)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3b1bfa0d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3b1bfa0d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3b1bfa0d Branch: refs/heads/calcite Commit: 3b1bfa0d7b83f0b9ee0ad535d6e1f99777c14cb6 Parents: 984e622 Author: James Taylor <jtay...@salesforce.com> Authored: Thu Jul 9 20:49:03 2015 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Jul 9 20:56:48 2015 -0700 ---------------------------------------------------------------------- .../end2end/SequenceBulkAllocationIT.java | 1286 ++++++++++++++++++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 5 +- .../apache/phoenix/compile/SequenceManager.java | 70 +- .../compile/SequenceValueExpression.java | 14 +- .../coprocessor/SequenceRegionObserver.java | 64 +- .../phoenix/exception/SQLExceptionCode.java | 2 + .../apache/phoenix/jdbc/PhoenixStatement.java | 10 +- .../apache/phoenix/parse/ParseNodeFactory.java | 6 +- .../phoenix/parse/SequenceValueParseNode.java | 8 +- .../phoenix/query/ConnectionQueryServices.java | 5 +- .../query/ConnectionQueryServicesImpl.java | 23 +- .../query/ConnectionlessQueryServicesImpl.java | 20 +- .../query/DelegateConnectionQueryServices.java | 13 +- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../org/apache/phoenix/schema/Sequence.java | 107 +- .../phoenix/schema/SequenceAllocation.java | 48 + .../org/apache/phoenix/util/SequenceUtil.java | 47 +- .../phoenix/schema/SequenceAllocationTest.java | 59 + .../apache/phoenix/util/SequenceUtilTest.java | 54 + 19 files changed, 1763 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java new file mode 100644 index 0000000..e7db1ec --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SequenceBulkAllocationIT.java @@ -0,0 +1,1286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + +/** + * Suite of integration tests that validate that Bulk Allocation of Sequence values + * using the NEXT <n> VALUES FOR <seq> syntax works as expected and interacts + * correctly with NEXT VALUE FOR <seq> and CURRENT VALUE FOR <seq>. + * + * All tests are run with both a generic connection and a multi-tenant connection. + * + */ +@RunWith(Parameterized.class) +public class SequenceBulkAllocationIT extends BaseClientManagedTimeIT { + + private static final long BATCH_SIZE = 3; + private static final String SELECT_NEXT_VALUE_SQL = + "SELECT NEXT VALUE FOR %s FROM SYSTEM.\"SEQUENCE\" LIMIT 1"; + private static final String SELECT_CURRENT_VALUE_SQL = + "SELECT CURRENT VALUE FOR %s FROM SYSTEM.\"SEQUENCE\" LIMIT 1"; + private static final String CREATE_SEQUENCE_NO_MIN_MAX_TEMPLATE = + "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s CACHE %s"; + private static final String CREATE_SEQUENCE_WITH_MIN_MAX_TEMPLATE = + "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CACHE %s"; + private static final String CREATE_SEQUENCE_WITH_MIN_MAX_AND_CYCLE_TEMPLATE = + "CREATE SEQUENCE bulkalloc.alpha START WITH %s INCREMENT BY %s MINVALUE %s MAXVALUE %s CYCLE CACHE %s"; + + + private Connection conn; + private String tenantId; + + public SequenceBulkAllocationIT(String tenantId) { + this.tenantId = tenantId; + } + + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String, String> props = getDefaultProps(); + // Must update config before starting server + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @After + public void tearDown() throws Exception { + // close any open connection between tests, so that connections are not leaked + if (conn != null) { + conn.close(); + } + } + + @Parameters + public static Object[] data() { + return new Object[] {null, "tenant1"}; + } + + + @Test + public void testSequenceParseNextValuesWithNull() throws Exception { + nextConnection(); + try { + conn.createStatement().executeQuery( + "SELECT NEXT NULL VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("null is not allowed to be used for <n> in NEXT <n> VALUES FOR <seq>"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + } + + @Test + public void testSequenceParseNextValuesWithNonNumber() throws Exception { + nextConnection(); + try { + conn.createStatement().executeQuery( + "SELECT NEXT '89b' VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("Only integers and longs are allowed to be used for <n> in NEXT <n> VALUES FOR <seq>"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + } + + + @Test + public void testSequenceParseNextValuesWithNegativeNumber() throws Exception { + nextConnection(); + try { + conn.createStatement().executeQuery( + "SELECT NEXT '-1' VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("null is not allowed to be used for <n> in NEXT <n> VALUES FOR <seq>"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + } + + @Test + public void testParseNextValuesSequenceWithZeroAllocated() throws Exception { + nextConnection(); + try { + conn.createStatement().executeQuery( + "SELECT NEXT 0 VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("Only integers and longs are allowed to be used for <n> in NEXT <n> VALUES FOR <seq>"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + } + + + @Test + public void testNextValuesForSequenceWithNoAllocatedValues() throws Exception { + // Create Sequence + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(1) + .numAllocated(100).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + // Bulk Allocate Sequence Slots + final int currentValueAfterAllocation = 100; + reserveSlotsInBulkAndAssertValue(1, props.numAllocated); + assertExpectedStateInSystemSequence(props, 101); + assertExpectedNumberOfValuesAllocated(1, currentValueAfterAllocation, props.incrementBy, props.numAllocated); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + assertExpectedNextValueForSequence(101); + } + + @Test + /** + * Validates we can invoke NEXT <n> VALUES FOR using bind vars. + */ + public void testNextValuesForSequenceUsingBinds() throws Exception { + + // Create Sequence + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(1) + .numAllocated(100).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + // Allocate 100 slots using SQL with Bind Params and a PreparedStatement + final int currentValueAfterAllocation = 100; + reserveSlotsInBulkUsingBindsAndAssertValue(1,props.numAllocated); + assertExpectedStateInSystemSequence(props, 101); + assertExpectedNumberOfValuesAllocated(1, currentValueAfterAllocation, props.incrementBy, props.numAllocated); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + assertExpectedNextValueForSequence(101); + } + + + @Test + public void testNextValuesForSequenceWithPreviouslyAllocatedValues() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(2); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 1100; + int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy; + int startValueAfterAllocation = 101; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + assertExpectedNextValueForSequence(nextValueAfterAllocation); + } + + + @Test + /** + * Validates that if we close a connection after performing + * NEXT <n> VALUES FOR <seq> the values are correctly returned from + * the latest batch. + */ + public void testConnectionCloseReturnsSequenceValuesCorrectly() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(2).startsWith(1).cacheSize(100) + .numAllocated(100).build(); + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(3); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 399; + int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy; + int startValueAfterAllocation = 201; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + + // Close the Connection + conn.close(); + + // Test that sequence, doesn't have gaps after closing the connection + nextConnection(); + assertExpectedNextValueForSequence(nextValueAfterAllocation); + assertExpectedCurrentValueForSequence(nextValueAfterAllocation); + + } + + @Test + /** + * Validates that calling NEXT <n> VALUES FOR <seq> works correctly with UPSERT. + */ + public void testNextValuesForSequenceWithUpsert() throws Exception { + + // Create Sequence + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + nextConnection(); + createSequenceWithNoMinMax(props); + + // Create TABLE + nextGenericConnection(); + conn.createStatement().execute("CREATE TABLE bulkalloc.test ( id INTEGER NOT NULL PRIMARY KEY)"); + nextConnection(); + + // Grab batch from Sequence + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(2); + assertExpectedStateInSystemSequence(props, 101); + + + // Perform UPSERT and validate Sequence was incremented as expected + conn.createStatement().execute("UPSERT INTO bulkalloc.test (id) VALUES (NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha)"); + conn.commit(); + assertExpectedStateInSystemSequence(props, 1101); + + // SELECT values out and verify + nextConnection(); + String query = "SELECT id, NEXT VALUE FOR bulkalloc.alpha FROM bulkalloc.test"; + ResultSet rs = conn.prepareStatement(query).executeQuery(); + assertTrue(rs.next()); + assertEquals(101, rs.getInt(1)); // Threw out cache of 100, incremented by 1000 + assertEquals(1101, rs.getInt(2)); + assertFalse(rs.next()); + } + + + + + @Test + public void testNextValuesForSequenceWithIncrementBy() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(3).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(4); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 3298; + int startValueAfterAllocation = 301; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(3298); + assertExpectedNextValueForSequence(3301); + } + + @Test + public void testNextValuesForSequenceWithNegativeIncrementBy() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(-1).startsWith(2000).cacheSize(100) + .numAllocated(1000).build(); + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(2000); + assertExpectedCurrentValueForSequence(2000); + assertExpectedNextValueForSequence(1999); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 901; + int startValueAfterAllocation = 1900; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(901); + assertExpectedNextValueForSequence(900); + } + + @Test + public void testNextValuesForSequenceWithNegativeIncrementByGreaterThanOne() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(-5).startsWith(2000).cacheSize(100) + .numAllocated(100).build(); + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + // Pull first batch from Sequence + assertExpectedNextValueForSequence(2000); + assertExpectedCurrentValueForSequence(2000); + assertExpectedNextValueForSequence(1995); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 1005; + int startValueAfterAllocation = 1500; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(1005); + assertExpectedNextValueForSequence(1000); + } + + + @Test + /** + * Validates that for NEXT <n> VALUES FOR if you try an allocate more slots such that that + * we exceed the max value of the sequence we throw an exception. Allocating sequence values in bulk + * should be an all or nothing operation - if the operation succeeds clients are guaranteed that they + * have access to all slots requested. + */ + public void testNextValuesForSequenceExceedsMaxValue() throws Exception { + final SequenceProperties sequenceProps = + new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100) + .numAllocated(1000).minValue(100).maxValue(900).build(); + + nextConnection(); + createSequenceWithMinMax(sequenceProps); + nextConnection(); + + // Pull first batch from the sequence + assertExpectedNextValueForSequence(100); + assertExpectedCurrentValueForSequence(100); + assertExpectedNextValueForSequence(101); + + // Attempt to bulk Allocate more slots than available + try { + conn.createStatement().executeQuery( + "SELECT NEXT " + sequenceProps.numAllocated + + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("Invoking SELECT NEXT VALUES should have thrown Reached Max Value Exception"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + + // Assert sequence didn't advance + assertExpectedCurrentValueForSequence(101); + assertExpectedNextValueForSequence(102); + } + + @Test + /** + * Validates that for NEXT <n> VALUES FOR if you try an allocate more slots such that that + * we exceed the min value of the sequence we throw an exception. Allocating sequence values in bulk + * should be an all or nothing operation - if the operation succeeds clients are guaranteed that they + * have access to all slots requested. + */ + public void testNextValuesForSequenceExceedsMinValue() throws Exception { + final SequenceProperties sequenceProps = + new SequenceProperties.Builder().incrementBy(-5).startsWith(900).cacheSize(100) + .numAllocated(160).minValue(100).maxValue(900).build(); + + nextConnection(); + createSequenceWithMinMax(sequenceProps); + nextConnection(); + + // Pull first batch from the sequence + assertExpectedNextValueForSequence(900); + assertExpectedCurrentValueForSequence(900); + assertExpectedNextValueForSequence(895); + + // Attempt to bulk Allocate more slots than available + try { + conn.createStatement().executeQuery( + "SELECT NEXT " + sequenceProps.numAllocated + + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("Invoking SELECT NEXT VALUES should have thrown Reached Max Value Exception"); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + + // Assert sequence didn't advance (we still throw out the cached values) + assertExpectedCurrentValueForSequence(895); + assertExpectedNextValueForSequence(890); + } + + + @Test + /** + * Validates that if we don't exceed the limit bulk allocation works with sequences with a + * min and max defined. + */ + public void testNextValuesForSequenceWithMinMaxDefined() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100) + .numAllocated(1000).minValue(100).maxValue(6000).build(); + + nextConnection(); + createSequenceWithMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(100); + assertExpectedCurrentValueForSequence(100); + assertExpectedNextValueForSequence(105); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 5595; + int startValueAfterAllocation = 600; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(5595); + assertExpectedNextValueForSequence(5600); + } + + @Test + public void testNextValuesForSequenceWithDefaultMax() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100) + .numAllocated(Long.MAX_VALUE - 100).build(); + + // Create Sequence + nextConnection(); + createSequenceWithMinMax(props); + nextConnection(); + + // Bulk Allocate Sequence Slots + long currentValueAfterAllocation = 100; + long startValueAfterAllocation = Long.MAX_VALUE; + reserveSlotsInBulkAndAssertValue(currentValueAfterAllocation, props.numAllocated); + assertExpectedStateInSystemSequence(props, startValueAfterAllocation); + + // Try and get next value + try { + conn.createStatement().executeQuery(String.format(SELECT_NEXT_VALUE_SQL, "bulkalloc.alpha")); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + } + + @Test + /** + * Validates that if our current or start value is > 0 and we ask for Long.MAX + * and overflow to the next value, the correct Exception is thrown when + * the expression is evaluated. + */ + public void testNextValuesForSequenceOverflowAllocation() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(100).cacheSize(100) + .numAllocated(Long.MAX_VALUE).build(); + + // Create Sequence + nextConnection(); + createSequenceWithMinMax(props); + nextConnection(); + + // Bulk Allocate Sequence Slots + try { + conn.createStatement().executeQuery( + "SELECT NEXT " + Long.MAX_VALUE + + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + } + + + @Test + /** + * Validates that specifying an bulk allocation less than the size of the cache defined on the sequence works + * as expected. + */ + public void testNextValuesForSequenceAllocationLessThanCacheSize() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100) + .numAllocated(50).minValue(100).maxValue(6000).build(); + + nextConnection(); + createSequenceWithMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(100); + assertExpectedCurrentValueForSequence(100); + assertExpectedNextValueForSequence(105); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 355; + int startValueAfterAllocation = 110; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertExpectedStateInSystemSequence(props, 600); + assertExpectedNumberOfValuesAllocated(startValueAfterAllocation, currentValueAfterAllocation, props.incrementBy, props.numAllocated); + + // Assert standard Sequence Operations return expected values + // 105 + (50 * 5) = 355 + assertExpectedCurrentValueForSequence(355); + assertExpectedNextValueForSequence(360); + assertExpectedNextValueForSequence(365); + assertExpectedNextValueForSequence(370); + } + + @Test + /** + * Validates that specifying an bulk allocation less than the size of the cache defined on the sequence works + * as expected if we don't have enough values in the cache to support the allocation. + */ + public void testNextValuesForInsufficentCacheValuesAllocationLessThanCacheSize() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100) + .numAllocated(50).minValue(100).maxValue(6000).build(); + + nextConnection(); + createSequenceWithMinMax(props); + nextConnection(); + + // Allocate 51 slots, only 49 will be left + int currentValueAfter51Allocations = 355; // 100 + 51 * 5 + for (int i = 100; i <= currentValueAfter51Allocations; i = i + 5) { + assertExpectedNextValueForSequence(i); + } + assertExpectedCurrentValueForSequence(currentValueAfter51Allocations); + + // Bulk Allocate 50 Sequence Slots which greater than amount left in cache + // This should throw away rest of the cache, and allocate the request slot + // from the next start value + int currentValueAfterAllocation = 845; + int startValueAfterAllocation = 600; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(845); + assertExpectedNextValueForSequence(850); + assertExpectedNextValueForSequence(855); + assertExpectedNextValueForSequence(860); + } + + @Test + /** + * Validates that for NEXT <n> VALUES FOR is not supported on Sequences that have the + * CYCLE flag set to true. + */ + public void testNextValuesForSequenceWithCycles() throws Exception { + final SequenceProperties sequenceProps = + new SequenceProperties.Builder().incrementBy(5).startsWith(100).cacheSize(100) + .numAllocated(1000).minValue(100).maxValue(900).build(); + + nextConnection(); + createSequenceWithMinMaxAndCycle(sequenceProps); + nextConnection(); + + // Full first batch from the sequence + assertExpectedNextValueForSequence(100); + assertExpectedCurrentValueForSequence(100); + assertExpectedNextValueForSequence(105); + + // Attempt to bulk Allocate more slots than available + try { + conn.createStatement().executeQuery( + "SELECT NEXT " + sequenceProps.numAllocated + + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\" LIMIT 1"); + fail("Invoking SELECT NEXT VALUES should have failed as operation is not supported for sequences with Cycles."); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getNextException() == null); + } + + // Assert sequence didn't advance + assertExpectedCurrentValueForSequence(105); + assertExpectedNextValueForSequence(110); + assertExpectedNextValueForSequence(115); + } + + @Test + /** + * Validates that if we have multiple NEXT <n> VALUES FOR <seq> expression and the + * CURRENT VALUE FOR expression work correctly when used in the same statement. + */ + public void testCurrentValueForAndNextValuesForExpressionsForSameSequence() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(2); + + // Bulk Allocate Sequence Slots + int currentValueAfterAllocation = 1100; + int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy; + int startValueAfterAllocation = 101; + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT CURRENT VALUE FOR bulkalloc.alpha, NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + assertTrue(rs.next()); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + int currentValueFor = rs.getInt(1); + int nextValuesFor = rs.getInt(2); + assertEquals("Expected the next value to be first value reserved", startValueAfterAllocation, nextValuesFor); + assertEquals("Expected current value to be the same as next value", startValueAfterAllocation, currentValueFor); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + assertExpectedNextValueForSequence(nextValueAfterAllocation); + } + + @Test + /** + * Validates that if we have multiple NEXT <n> VALUES FOR <seq> expressions for the *same* sequence + * in a statement we only process the one which has the highest value of <n> and return the start + * value for that for all expressions. + */ + public void testMultipleNextValuesForExpressionsForSameSequence() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(2); + + // Bulk Allocate Sequence Slots - One for 5 and one for 1000, 1000 should have precedence + int currentValueAfterAllocation = 1100; + int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy; + int startValueAfterAllocation = 101; + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT 5 VALUES FOR bulkalloc.alpha, NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + assertTrue(rs.next()); + int firstValue = rs.getInt(1); + int secondValue = rs.getInt(2); + assertEquals("Expected both expressions to return the same value", firstValue, secondValue); + assertEquals("Expected the value returned to be the highest allocation", startValueAfterAllocation, firstValue); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + assertExpectedNextValueForSequence(nextValueAfterAllocation); + } + + @Test + /** + * Validates that if we have NEXT VALUE FOR <seq> and NEXT <n> VALUES FOR <seq> expressions for the *same* sequence + * in a statement we only process way and honor the value of the highest value of <n>, where for + * NEXT VALUE FOR <seq> is assumed to be 1. + */ + public void testMultipleDifferentExpressionsForSameSequence() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + // Pull First Batch from Sequence + assertExpectedNextValueForSequence(1); + + // Bulk Allocate Sequence Slots and Get Next Value in Same Statement + int currentValueAfterAllocation = 1100; + int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy; + int startValueAfterAllocation = 101; + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT VALUE FOR bulkalloc.alpha, " + + "NEXT " + props.numAllocated + " VALUES FOR bulkalloc.alpha, " + + "CURRENT VALUE FOR bulkalloc.alpha, " + + "NEXT 999 VALUES FOR bulkalloc.alpha " + + "FROM SYSTEM.\"SEQUENCE\""); + assertTrue(rs.next()); + assertBulkAllocationSucceeded(props, currentValueAfterAllocation, startValueAfterAllocation); + + // Assert all values returned are the same + // Expect them to be the highest value from NEXT VALUE or NEXT <n> VALUES FOR + int previousVal = 0; + for (int i = 1; i <= 4; i++) { + int currentVal = rs.getInt(i); + if (i != 1) { + assertEquals( + "Expected all NEXT VALUE FOR and NEXT <n> VALUES FOR expressions to return the same value", + previousVal, currentVal); + } + previousVal = currentVal; + } + + // Assert standard Sequence Operations return expected values + assertExpectedCurrentValueForSequence(currentValueAfterAllocation); + assertExpectedNextValueForSequence(nextValueAfterAllocation); + } + + + @Test + /** + * Validates that using NEXT <n> VALUES FOR on different sequences in the + * same statement with *different* values of <n> works as expected. This + * test validates that we keep our numAllocated array and sequence keys in + * sync during the sequence management process. + */ + public void testMultipleNextValuesForExpressionsForDifferentSequences() throws Exception { + + nextConnection(); + conn.createStatement().execute("CREATE SEQUENCE bulkalloc.alpha START WITH 30 INCREMENT BY 3 CACHE 100"); + conn.createStatement().execute("CREATE SEQUENCE bulkalloc.beta START WITH 100 INCREMENT BY 5 CACHE 50"); + nextConnection(); + + // Bulk Allocate Sequence Slots for Two Sequences + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT 100 VALUES FOR bulkalloc.alpha, NEXT 1000 VALUES FOR bulkalloc.beta FROM SYSTEM.\"SEQUENCE\""); + assertTrue(rs.next()); + assertEquals(30, rs.getInt(1)); + assertEquals(100, rs.getInt(2)); + + // Assert standard Sequence Operations return expected values + for (int i = 330; i < 330 + (2 * 100); i += 3) { + assertExpectedCurrentValueForSequence(i - 3, "bulkalloc.alpha"); + assertExpectedNextValueForSequence(i, "bulkalloc.alpha"); + } + + for (int i = 5100; i < 5100 + (2 * 1000); i += 5) { + assertExpectedCurrentValueForSequence(i - 5, "bulkalloc.beta"); + assertExpectedNextValueForSequence(i, "bulkalloc.beta"); + } + } + + @Test + /** + * Validates that calling NEXT <n> VALUES FOR with EXPLAIN PLAN doesn't use + * allocate any slots. + */ + public void testExplainPlanValidatesSequences() throws Exception { + + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(3).startsWith(30).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + + nextGenericConnection(); + conn.createStatement().execute("CREATE TABLE bulkalloc.simpletbl (k BIGINT NOT NULL PRIMARY KEY)"); + nextConnection(); + + // Bulk Allocate Sequence Slots + int startValueAfterAllocation = 30; + reserveSlotsInBulkAndAssertValue(startValueAfterAllocation, props.numAllocated); + + // Execute EXPLAIN PLAN multiple times, which should not change Sequence values + for (int i = 0; i < 3; i++) { + conn.createStatement().executeQuery("EXPLAIN SELECT NEXT 1000 VALUES FOR bulkalloc.alpha FROM bulkalloc.simpletbl"); + } + + // Validate the current value was not advanced and was the starting value + assertExpectedStateInSystemSequence(props, 3030); + + // Assert standard Sequence Operations return expected values + int startValue = 3030; + for (int i = startValue; i < startValue + (2 * props.cacheSize); i += props.incrementBy) { + assertExpectedCurrentValueForSequence(i - props.incrementBy); + assertExpectedNextValueForSequence(i); + } + } + + @Test + public void testExplainPlanForNextValuesFor() throws Exception { + + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(3).startsWith(30).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextGenericConnection(); + conn.createStatement().execute("CREATE TABLE bulkalloc.simpletbl (k BIGINT NOT NULL PRIMARY KEY)"); + nextConnection(); + + // Execute EXPLAIN PLAN which should not change Sequence values + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT NEXT 1000 VALUES FOR bulkalloc.alpha FROM bulkalloc.simpletbl"); + + // Assert output for Explain Plain result is as expected + assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER BULKALLOC.SIMPLETBL\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + "CLIENT RESERVE VALUES FROM 1 SEQUENCE", QueryUtil.getExplainPlan(rs)); + } + + + /** + * Performs a multithreaded test whereby we interleave reads from the result set of + * NEXT VALUE FOR and NEXT <n> VALUES FOR to make sure we get expected values with the + * following order of execution: + * + * 1) Execute expression NEXT <n> VALUES FOR <seq> + * 2) Execute expression NEXT VALUE FOR <seq> + * 3) Read back value from expression NEXT VALUE FOR <seq> via rs.next() + * 4) Read back value from expression NEXT <n> VALUES FOR <seq> via rs.next() + */ + public void testNextValuesForMixedWithNextValueForMultiThreaded() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(2); + + // Bulk Allocate Sequence Slots + final long startValueAfterAllocation1 = 101; + final long startValueAfterAllocation2 = 1101; + final long numSlotToAllocate = props.numAllocated; + + // Setup and run tasks in independent Threads + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + Callable<Long> task1 = new Callable<Long>() { + + @Override + public Long call() throws Exception { + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT " + numSlotToAllocate + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + latch1.countDown(); // Allows NEXT VALUE FOR thread to proceed + latch2.await(); // Waits until NEXT VALUE FOR thread reads and increments currentValue + rs.next(); + return rs.getLong(1); + } + + }; + + Callable<Long> task2 = new Callable<Long>() { + + @Override + public Long call() throws Exception { + latch1.await(); // Wait for execution of NEXT <n> VALUES FOR expression + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT VALUE FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + rs.next(); + long retVal = rs.getLong(1); + latch2.countDown(); // Allow NEXT <n> VALUES for thread to completed + return retVal; + } + + }; + + @SuppressWarnings("unchecked") + List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 20, TimeUnit.SECONDS); + assertEquals(startValueAfterAllocation1, futures.get(0).get(10, TimeUnit.SECONDS).longValue()); + assertEquals(startValueAfterAllocation2, futures.get(1).get(10, TimeUnit.SECONDS).longValue()); + } finally { + executorService.shutdown(); + } + } + + @Test + public void testMultipleNextValuesWithDiffAllocsForMultiThreaded() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + assertExpectedNextValueForSequence(1); + assertExpectedCurrentValueForSequence(1); + assertExpectedNextValueForSequence(2); + + // Bulk Allocate Sequence Slots + final long startValueAfterAllocation1 = 101; + final long startValueAfterAllocation2 = 1101; + final long numSlotToAllocate1 = 1000; + final long numSlotToAllocate2 = 100; + + // Setup and run tasks in independent Threads + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + Callable<Long> task1 = new Callable<Long>() { + + @Override + public Long call() throws Exception { + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT " + numSlotToAllocate1 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + rs.next(); + latch1.countDown(); // Allows other thread to proceed + latch2.await(); + return rs.getLong(1); + } + + }; + + Callable<Long> task2 = new Callable<Long>() { + + @Override + public Long call() throws Exception { + latch1.await(); // Wait for other thread to execut of NEXT <n> VALUES FOR expression + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT " + numSlotToAllocate2 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + rs.next(); + long retVal = rs.getLong(1); + latch2.countDown(); // Allow thread to completed + return retVal; + } + + }; + + @SuppressWarnings("unchecked") + List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 5, TimeUnit.SECONDS); + + // Retrieve value from Thread running NEXT <n> VALUES FOR + Long retValue1 = futures.get(0).get(5, TimeUnit.SECONDS); + assertEquals(startValueAfterAllocation1, retValue1.longValue()); + + // Retrieve value from Thread running NEXT VALUE FOR + Long retValue2 = futures.get(1).get(5, TimeUnit.SECONDS); + assertEquals(startValueAfterAllocation2, retValue2.longValue()); + + } finally { + executorService.shutdown(); + } + } + + @Test + public void testMultipleNextValuesWithSameAllocsForMultiThreaded() throws Exception { + final SequenceProperties props = + new SequenceProperties.Builder().incrementBy(1).startsWith(1).cacheSize(100) + .numAllocated(1000).build(); + + nextConnection(); + createSequenceWithNoMinMax(props); + nextConnection(); + + // Bulk Allocate Sequence Slots + final long startValueAfterAllocation1 = 1; + final long startValueAfterAllocation2 = 1001; + final long numSlotToAllocate1 = 1000; + final long numSlotToAllocate2 = 1000; + + // Setup and run tasks in independent Threads + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + Callable<Long> task1 = new Callable<Long>() { + + @Override + public Long call() throws Exception { + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT " + numSlotToAllocate1 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + latch1.countDown(); // Allows other thread to proceed + latch2.await(); + rs.next(); + return rs.getLong(1); + } + + }; + + Callable<Long> task2 = new Callable<Long>() { + + @Override + public Long call() throws Exception { + latch1.await(); // Wait for other thread to execut of NEXT <n> VALUES FOR expression + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT " + numSlotToAllocate2 + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + rs.next(); + long retVal = rs.getLong(1); + latch2.countDown(); // Allow thread to completed + return retVal; + } + + }; + + // Because of the way the threads are interleaved the ranges used by each thread will the reserve + // of the order to statement execution + @SuppressWarnings("unchecked") + List<Future<Long>> futures = executorService.invokeAll(Lists.newArrayList(task1, task2), 5, TimeUnit.SECONDS); + assertEquals(startValueAfterAllocation2, futures.get(0).get(5, TimeUnit.SECONDS).longValue()); + assertEquals(startValueAfterAllocation1, futures.get(1).get(5, TimeUnit.SECONDS).longValue()); + + } finally { + executorService.shutdown(); + } + } + + // ----------------------------------------------------------------- + // Private Helper Methods + // ----------------------------------------------------------------- + private void assertBulkAllocationSucceeded(SequenceProperties props, + int currentValueAfterAllocation, int startValueAfterAllocation) throws SQLException { + int nextValueAfterAllocation = currentValueAfterAllocation + props.incrementBy; + assertExpectedStateInSystemSequence(props, nextValueAfterAllocation); + assertExpectedNumberOfValuesAllocated(startValueAfterAllocation, currentValueAfterAllocation, props.incrementBy, props.numAllocated); + } + + + private void createSequenceWithNoMinMax(final SequenceProperties props) throws SQLException { + conn.createStatement().execute( + String.format(CREATE_SEQUENCE_NO_MIN_MAX_TEMPLATE, props.startsWith, + props.incrementBy, props.cacheSize)); + } + + private void createSequenceWithMinMax(final SequenceProperties props) throws SQLException { + conn.createStatement().execute( + String.format(CREATE_SEQUENCE_WITH_MIN_MAX_TEMPLATE, props.startsWith, + props.incrementBy, props.minValue, props.maxValue, props.cacheSize)); + } + + private void createSequenceWithMinMaxAndCycle(final SequenceProperties props) throws SQLException { + conn.createStatement().execute( + String.format(CREATE_SEQUENCE_WITH_MIN_MAX_AND_CYCLE_TEMPLATE, props.startsWith, + props.incrementBy, props.minValue, props.maxValue, props.cacheSize)); + } + + private void reserveSlotsInBulkAndAssertValue(long expectedValue, long numSlotToAllocate) + throws SQLException { + ResultSet rs = + conn.createStatement().executeQuery( + "SELECT NEXT " + numSlotToAllocate + " VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + assertTrue(rs.next()); + assertEquals(expectedValue, rs.getInt(1)); + } + + private void reserveSlotsInBulkUsingBindsAndAssertValue(int expectedValue, long numSlotToAllocate) + throws SQLException { + PreparedStatement ps = conn.prepareStatement("SELECT NEXT ? VALUES FOR bulkalloc.alpha FROM SYSTEM.\"SEQUENCE\""); + ps.setLong(1, numSlotToAllocate); + ResultSet rs = ps.executeQuery(); + assertTrue(rs.next()); + int retValue = rs.getInt(1); + assertEquals(expectedValue, retValue); + } + + private void assertExpectedCurrentValueForSequence(int expectedValue) throws SQLException { + assertExpectedCurrentValueForSequence(expectedValue, "bulkalloc.alpha"); + } + + private void assertExpectedCurrentValueForSequence(int expectedValue, String sequenceName) throws SQLException { + ResultSet rs; + rs = conn.createStatement().executeQuery(String.format(SELECT_CURRENT_VALUE_SQL, sequenceName)); + assertTrue(rs.next()); + assertEquals(expectedValue, rs.getInt(1)); + } + + private void assertExpectedNextValueForSequence(int expectedValue) throws SQLException { + assertExpectedNextValueForSequence(expectedValue, "bulkalloc.alpha"); + } + + private void assertExpectedNextValueForSequence(int expectedValue, String sequenceName) throws SQLException { + ResultSet rs; + rs = conn.createStatement().executeQuery(String.format(SELECT_NEXT_VALUE_SQL, sequenceName)); + assertTrue(rs.next()); + assertEquals(expectedValue, rs.getInt(1)); + } + + + /** + * Returns a non-tenant specific connection. + */ + private void nextGenericConnection() throws Exception { + if (conn != null) conn.close(); + long ts = nextTimestamp(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + } + + private void nextConnection() throws Exception { + if (conn != null) conn.close(); + long ts = nextTimestamp(); + if (tenantId != null) { + // Create tenant specific connection + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(nextTimestamp())); + this.conn = DriverManager.getConnection(getUrl() + ';' + TENANT_ID_ATTRIB + '=' + "tenant1", props); + + } else { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + conn = DriverManager.getConnection(getUrl(), props); + } + } + + private void assertExpectedStateInSystemSequence(SequenceProperties props, long currentValue) + throws SQLException { + // Validate state in System.Sequence + ResultSet rs = + conn.createStatement() + .executeQuery( + "SELECT start_with, current_value, increment_by, cache_size, min_value, max_value, cycle_flag, sequence_schema, sequence_name FROM SYSTEM.\"SEQUENCE\""); + assertTrue(rs.next()); + assertEquals(props.startsWith, rs.getLong("start_with")); + assertEquals(props.incrementBy, rs.getLong("increment_by")); + assertEquals(props.cacheSize, rs.getLong("cache_size")); + assertEquals(false, rs.getBoolean("cycle_flag")); + assertEquals("BULKALLOC", rs.getString("sequence_schema")); + assertEquals("ALPHA", rs.getString("sequence_name")); + assertEquals(currentValue, rs.getLong("current_value")); + assertEquals(props.minValue, rs.getLong("min_value")); + assertEquals(props.maxValue, rs.getLong("max_value")); + assertFalse(rs.next()); + } + + private void assertExpectedNumberOfValuesAllocated(long firstValue, long lastValue, + int incrementBy, long numAllocated) { + int cnt = 0; + for (long i = firstValue; (incrementBy > 0 ? i <= lastValue : i >= lastValue); i += incrementBy) { + cnt++; + } + assertEquals("Incorrect number of values allocated: " + cnt, numAllocated, cnt); + } + + private static class SequenceProperties { + + private final long numAllocated; + private final int incrementBy; + private final int startsWith; + private final int cacheSize; + private final long minValue; + private final long maxValue; + + public SequenceProperties(Builder builder) { + this.numAllocated = builder.numAllocated; + this.incrementBy = builder.incrementBy; + this.startsWith = builder.startsWith; + this.cacheSize = builder.cacheSize; + this.minValue = builder.minValue; + this.maxValue = builder.maxValue; + } + + private static class Builder { + + long maxValue = Long.MAX_VALUE; + long minValue = Long.MIN_VALUE; + long numAllocated = 100; + int incrementBy = 1; + int startsWith = 1; + int cacheSize = 100; + + public Builder numAllocated(long numAllocated) { + this.numAllocated = numAllocated; + return this; + } + + public Builder startsWith(int startsWith) { + this.startsWith = startsWith; + return this; + } + + public Builder cacheSize(int cacheSize) { + this.cacheSize = cacheSize; + return this; + } + + public Builder incrementBy(int incrementBy) { + this.incrementBy = incrementBy; + return this; + } + + public Builder minValue(long minValue) { + this.minValue = minValue; + return this; + } + + public Builder maxValue(long maxValue) { + this.maxValue = maxValue; + return this; + } + + public SequenceProperties build() { + return new SequenceProperties(this); + } + + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index ca5e778..10fda68 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -903,7 +903,10 @@ term returns [ParseNode ret] } | (n=NEXT | CURRENT) VALUE FOR s=from_table_name { contextStack.peek().hasSequences(true); - $ret = n==null ? factory.currentValueFor(s) : factory.nextValueFor(s); } + $ret = n==null ? factory.currentValueFor(s) : factory.nextValueFor(s, null); } + | (n=NEXT) lorb=literal_or_bind VALUES FOR s=from_table_name + { contextStack.peek().hasSequences(true); + $ret = factory.nextValueFor(s, lorb); } ; one_or_more_expressions returns [List<ParseNode> ret] http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java index ede6d9c..5ec8cd2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java @@ -24,16 +24,23 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SequenceValueParseNode; import org.apache.phoenix.parse.SequenceValueParseNode.Op; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.tuple.DelegateTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.SequenceUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,7 +48,7 @@ import com.google.common.collect.Maps; public class SequenceManager { private final PhoenixStatement statement; private int[] sequencePosition; - private List<SequenceKey> nextSequences; + private List<SequenceAllocation> nextSequences; private List<SequenceKey> currentSequences; private final Map<SequenceKey,SequenceValueExpression> sequenceMap = Maps.newHashMap(); private final BitSet isNextSequence = new BitSet(); @@ -113,20 +120,30 @@ public class SequenceManager { } } - public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) { + public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) throws SQLException { PName tenantName = statement.getConnection().getTenantId(); String tenantId = tenantName == null ? null : tenantName.getString(); TableName tableName = node.getTableName(); int nSaltBuckets = statement.getConnection().getQueryServices().getSequenceSaltBuckets(); + ParseNode numToAllocateNode = node.getNumToAllocateNode(); + + long numToAllocate = determineNumToAllocate(tableName, numToAllocateNode); SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets); SequenceValueExpression expression = sequenceMap.get(key); if (expression == null) { int index = sequenceMap.size(); - expression = new SequenceValueExpression(key, node.getOp(), index); + expression = new SequenceValueExpression(key, node.getOp(), index, numToAllocate); sequenceMap.put(key, expression); - } else if (expression.op != node.getOp()){ - expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex()); - } + } else if (expression.op != node.getOp() || expression.getNumToAllocate() < numToAllocate) { + // Keep the maximum allocation size we see in a statement + SequenceValueExpression oldExpression = expression; + expression = new SequenceValueExpression(key, node.getOp(), expression.getIndex(), Math.max(expression.getNumToAllocate(), numToAllocate)); + if (oldExpression.getNumToAllocate() < numToAllocate) { + // If we found a NEXT VALUE expression with a higher number to allocate + // We override the original expression + sequenceMap.put(key, expression); + } + } // If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT if (node.getOp() == Op.NEXT_VALUE) { isNextSequence.set(expression.getIndex()); @@ -134,6 +151,38 @@ public class SequenceManager { return expression; } + + /** + * If caller specified used NEXT <n> VALUES FOR <seq> expression then we have set the numToAllocate. + * If numToAllocate is > 1 we treat this as a bulk reservation of a block of sequence slots. + * + * @throws a SQLException if we can't compile the expression + */ + private long determineNumToAllocate(TableName sequenceName, ParseNode numToAllocateNode) + throws SQLException { + + if (numToAllocateNode != null) { + final StatementContext context = new StatementContext(statement); + ExpressionCompiler expressionCompiler = new ExpressionCompiler(context); + Expression expression = numToAllocateNode.accept(expressionCompiler); + ImmutableBytesWritable ptr = context.getTempPtr(); + expression.evaluate(null, ptr); + if (ptr.getLength() == 0 || !expression.getDataType().isCoercibleTo(PLong.INSTANCE)) { + throw SequenceUtil.getException(sequenceName.getSchemaName(), sequenceName.getTableName(), SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT); + } + + // Parse <n> and make sure it is greater than 0. We don't support allocating 0 or negative values! + long numToAllocate = (long) PLong.INSTANCE.toObject(ptr, expression.getDataType()); + if (numToAllocate < 1) { + throw SequenceUtil.getException(sequenceName.getSchemaName(), sequenceName.getTableName(), SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT); + } + return numToAllocate; + + } else { + // Standard Sequence Allocation Behavior + return SequenceUtil.DEFAULT_NUM_SLOTS_TO_ALLOCATE; + } + } public void validateSequences(Sequence.ValueOp action) throws SQLException { if (sequenceMap.isEmpty()) { @@ -146,14 +195,17 @@ public class SequenceManager { currentSequences = Lists.newArrayListWithExpectedSize(maxSize); for (Map.Entry<SequenceKey, SequenceValueExpression> entry : sequenceMap.entrySet()) { if (isNextSequence.get(entry.getValue().getIndex())) { - nextSequences.add(entry.getKey()); + nextSequences.add(new SequenceAllocation(entry.getKey(), entry.getValue().getNumToAllocate())); } else { currentSequences.add(entry.getKey()); } } long[] srcSequenceValues = new long[nextSequences.size()]; SQLException[] sqlExceptions = new SQLException[nextSequences.size()]; + + // Sort the next sequences to prevent deadlocks Collections.sort(nextSequences); + // Create reverse indexes for (int i = 0; i < nextSequences.size(); i++) { sequencePosition[i] = sequenceMap.get(nextSequences.get(i)).getIndex(); @@ -168,4 +220,6 @@ public class SequenceManager { services.validateSequences(nextSequences, timestamp, srcSequenceValues, sqlExceptions, action); setSequenceValues(srcSequenceValues, dstSequenceValues, sqlExceptions); } -} + +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java index cdaae68..71e2d02 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceValueExpression.java @@ -32,13 +32,23 @@ public class SequenceValueExpression extends BaseTerminalExpression { private final SequenceKey key; final Op op; private final int index; + private final long numToAllocate; - public SequenceValueExpression(SequenceKey key, Op op, int index) { + public SequenceValueExpression(SequenceKey key, Op op, int index, long numToAllocate) { this.key = key; this.op = op; this.index = index; + this.numToAllocate = numToAllocate; } + public long getNumToAllocate() { + return numToAllocate; + } + + public SequenceKey getKey() { + return key; + } + public int getIndex() { return index; } @@ -73,7 +83,7 @@ public class SequenceValueExpression extends BaseTerminalExpression { @Override public String toString() { - return op.getName() + " VALUE FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName()); + return op.getName() + (numToAllocate == 1 ? " VALUE " : (" " + numToAllocate + " VALUES " )) + "FOR " + SchemaUtil.getTableName(key.getSchemaName(),key.getSequenceName()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 9b5f040..bb6c4b5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -41,17 +41,17 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.types.PBoolean; -import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; -import org.apache.phoenix.schema.Sequence; -import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SequenceUtil; @@ -77,6 +77,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION"; public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE"; public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE"; + public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE"; private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS)); private static Result getErrorResult(byte[] row, long timestamp, int errorCode) { @@ -138,10 +139,8 @@ public class SequenceRegionObserver extends BaseRegionObserver { if (result.isEmpty()) { return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode()); } - if (validateOnly) { - return result; - } + KeyValue currentValueKV = Sequence.getCurrentValueKV(result); KeyValue incrementByKV = Sequence.getIncrementByKV(result); KeyValue cacheSizeKV = Sequence.getCacheSizeKV(result); @@ -224,6 +223,28 @@ public class SequenceRegionObserver extends BaseRegionObserver { cycleKV.getValueOffset(), cycleKV.getValueLength()); } + long numSlotsToAllocate = calculateNumSlotsToAllocate(increment); + + // We don't support Bulk Allocations on sequences that have the CYCLE flag set to true + if (cycle && !SequenceUtil.isCycleAllowed(numSlotsToAllocate)) { + return getErrorResult(row, maxTimestamp, SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED.getErrorCode()); + } + + // Bulk Allocations are expressed by NEXT <n> VALUES FOR + if (SequenceUtil.isBulkAllocation(numSlotsToAllocate)) { + if (SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate)) { + // If we try to allocate more slots than the limit we return an error. + // Allocating sequence values in bulk should be an all or nothing operation. + // If the operation succeeds clients are guaranteed that they have reserved + // all the slots requested. + return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode()); + } + } + + if (validateOnly) { + return result; + } + // return if we have run out of sequence values if (limitReached) { if (cycle) { @@ -231,16 +252,15 @@ public class SequenceRegionObserver extends BaseRegionObserver { currentValue = increasingSeq ? minValue : maxValue; } else { - SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE - : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE; - return getErrorResult(row, maxTimestamp, code.getErrorCode()); + return getErrorResult(row, maxTimestamp, SequenceUtil.getLimitReachedErrorCode(increasingSeq).getErrorCode()); } } - + // check if the limit was reached - limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize); - // update currentValue - currentValue += incrementBy * cacheSize; + limitReached = SequenceUtil.checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize, numSlotsToAllocate); + + // update currentValue + currentValue += incrementBy * (SequenceUtil.isBulkAllocation(numSlotsToAllocate) ? numSlotsToAllocate : cacheSize); // update the currentValue of the Result row KeyValue newCurrentValueKV = createKeyValue(row, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, currentValue, timestamp); Sequence.replaceCurrentValueKV(cells, newCurrentValueKV); @@ -264,6 +284,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { region.closeRegionOperation(); } } + /** * Creates a new KeyValue for a long value @@ -417,5 +438,20 @@ public class SequenceRegionObserver extends BaseRegionObserver { region.closeRegionOperation(); } } + + /** + * Determines whether a request for incrementing the sequence was a bulk allocation and if so + * what the number of slots to allocate is. This is triggered by the NEXT <n> VALUES FOR expression. + * For backwards compatibility with older clients, we default the value to 1 which preserves + * existing behavior when invoking NEXT VALUE FOR. + */ + private long calculateNumSlotsToAllocate(final Increment increment) { + long numToAllocate = 1; + byte[] numToAllocateBytes = increment.getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE); + if (numToAllocateBytes != null) { + numToAllocate = Bytes.toLong(numToAllocateBytes); + } + return numToAllocate; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index cc8b02a..b9f81fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -276,6 +276,8 @@ public enum SQLExceptionCode { SEQUENCE_VAL_REACHED_MAX_VALUE(1212, "42Z12", "Reached MAXVALUE of sequence"), SEQUENCE_VAL_REACHED_MIN_VALUE(1213, "42Z13", "Reached MINVALUE of sequence"), INCREMENT_BY_MUST_NOT_BE_ZERO(1214, "42Z14", "Sequence INCREMENT BY value cannot be zero"), + NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT(1215, "42Z15", "Sequence NEXT n VALUES FOR must be a postive integer or constant." ), + NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED(1216, "42Z16", "Sequence NEXT n VALUES FOR is not supported for Sequences with the CYCLE flag" ), /** Parser error. (errorcode 06, sqlState 42P) */ PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYTAX_ERROR), http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index f323ec4..9689589 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -232,7 +232,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException { - QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE); + QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE); return connection.getQueryServices().getOptimizer().optimize(this, plan); } @@ -245,7 +245,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho public PhoenixResultSet call() throws SQLException { final long startTime = System.currentTimeMillis(); try { - QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); + QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); plan = connection.getQueryServices().getOptimizer().optimize( PhoenixStatement.this, plan); // this will create its own trace internally, so we don't wrap this @@ -303,7 +303,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho // since they'd update data directly from coprocessors, and should thus operate on // the latest state try { - MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); + MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); MutationState state = plan.execute(); connection.getMutationState().join(state); if (connection.getAutoCommit()) { @@ -1203,14 +1203,14 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho if (stmt.getOperation().isMutation()) { throw new ExecuteQueryNotApplicableException(query); } - return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE); + return stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE); } public MutationPlan compileMutation(CompilableStatement stmt, String query) throws SQLException { if (!stmt.getOperation().isMutation()) { throw new ExecuteUpdateNotApplicableException(query); } - return stmt.compilePlan(this, Sequence.ValueOp.RESERVE_SEQUENCE); + return stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE); } public MutationPlan compileMutation(String sql) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 44359a7..49b14c6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -310,11 +310,11 @@ public class ParseNodeFactory { } public SequenceValueParseNode currentValueFor(TableName tableName) { - return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.CURRENT_VALUE); + return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.CURRENT_VALUE, null); } - public SequenceValueParseNode nextValueFor(TableName tableName) { - return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.NEXT_VALUE); + public SequenceValueParseNode nextValueFor(TableName tableName, ParseNode numToAllocateNode) { + return new SequenceValueParseNode(tableName, SequenceValueParseNode.Op.NEXT_VALUE, numToAllocateNode); } public AddColumnStatement addColumn(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, ListMultimap<String,Pair<String,Object>> props) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java index a5d60fe..1fc670c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SequenceValueParseNode.java @@ -39,10 +39,16 @@ public class SequenceValueParseNode extends TerminalParseNode { } private final TableName tableName; private final Op op; + private final ParseNode numToAllocate; - public SequenceValueParseNode(TableName tableName, Op op) { + public SequenceValueParseNode(TableName tableName, Op op, ParseNode numToAllocate) { this.tableName = tableName; this.op = op; + this.numToAllocate = numToAllocate; + } + + public ParseNode getNumToAllocateNode() { + return numToAllocate; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/3b1bfa0d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index dc51b10..3b53309 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -40,6 +40,7 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.PTableStats; @@ -91,8 +92,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated long createSequence(String tenantId, String schemaName, String sequenceName, long startWith, long incrementBy, long cacheSize, long minValue, long maxValue, boolean cycle, long timestamp) throws SQLException; long dropSequence(String tenantId, String schemaName, String sequenceName, long timestamp) throws SQLException; - void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException; - void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp, long[] values, SQLException[] exceptions) throws SQLException; + void validateSequences(List<SequenceAllocation> sequenceAllocations, long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException; + void incrementSequences(List<SequenceAllocation> sequenceAllocation, long timestamp, long[] values, SQLException[] exceptions) throws SQLException; long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException; void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException;