PHOENIX-6 Support ON DUPLICATE KEY construct
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2325a41 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2325a41 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2325a41 Branch: refs/heads/encodecolumns2 Commit: e2325a413d2b44f1432b30b7fd337643793cbd21 Parents: 613a5b7 Author: James Taylor <jamestay...@apache.org> Authored: Thu Oct 27 11:20:20 2016 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Thu Oct 27 14:03:28 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/OnDuplicateKeyIT.java | 523 +++++++++++++++++++ .../phoenix/end2end/index/IndexTestUtil.java | 6 +- .../org/apache/phoenix/tx/TransactionIT.java | 15 + phoenix-core/src/main/antlr3/PhoenixSQL.g | 24 +- .../apache/phoenix/compile/DeleteCompiler.java | 6 +- .../apache/phoenix/compile/UpsertCompiler.java | 104 +++- .../UngroupedAggregateRegionObserver.java | 2 +- .../phoenix/exception/SQLExceptionCode.java | 6 + .../apache/phoenix/execute/MutationState.java | 32 +- .../org/apache/phoenix/hbase/index/Indexer.java | 98 +++- .../hbase/index/builder/BaseIndexBuilder.java | 14 +- .../hbase/index/builder/IndexBuildManager.java | 10 + .../hbase/index/builder/IndexBuilder.java | 29 +- .../phoenix/hbase/index/covered/IndexCodec.java | 1 - .../hbase/index/util/KeyValueBuilder.java | 15 +- .../phoenix/index/PhoenixIndexBuilder.java | 318 +++++++++++ .../apache/phoenix/jdbc/PhoenixStatement.java | 11 +- .../apache/phoenix/parse/ParseNodeFactory.java | 7 +- .../apache/phoenix/parse/UpsertStatement.java | 10 +- .../apache/phoenix/schema/DelegateColumn.java | 10 + .../apache/phoenix/schema/DelegateTable.java | 18 +- .../org/apache/phoenix/schema/PColumnImpl.java | 12 +- .../java/org/apache/phoenix/schema/PRow.java | 11 +- .../java/org/apache/phoenix/schema/PTable.java | 6 +- .../org/apache/phoenix/schema/PTableImpl.java | 48 +- .../org/apache/phoenix/util/ExpressionUtil.java | 1 - .../phoenix/compile/QueryCompilerTest.java | 104 +++- 27 files changed, 1321 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java new file mode 100644 index 0000000..9a81026 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKeyIT.java @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class OnDuplicateKeyIT extends ParallelStatsDisabledIT { + private final String indexDDL; + + public OnDuplicateKeyIT(String indexDDL) { + this.indexDDL = indexDDL; + } + + @Parameters + public static Collection<Object> data() { + List<Object> testCases = Lists.newArrayList(); + testCases.add(new String[] { + "", + }); + testCases.add(new String[] { + "create index %s_IDX on %s(counter1) include (counter2)", + }); + testCases.add(new String[] { + "create index %s_IDX on %s(counter1, counter2)", + }); + testCases.add(new String[] { + "create local index %s_IDX on %s(counter1) include (counter2)", + }); + testCases.add(new String[] { + "create local index %s_IDX on %s(counter1, counter2)", + }); + return testCases; + } + + private void createIndex(Connection conn, String tableName) throws SQLException { + if (indexDDL == null || indexDDL.length() == 0) { + return; + } + String ddl = String.format(indexDDL, tableName, tableName); + conn.createStatement().execute(ddl); + } + + @Test + public void testNewAndUpdateOnSingleNumericColumn() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 smallint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"; + conn.createStatement().execute(dml); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(0,rs.getLong(2)); + assertFalse(rs.next()); + + conn.createStatement().execute(dml); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(1,rs.getLong(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testNewAndUpdateOnSingleNumericColumnWithOtherColumns() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(k1 varchar, k2 varchar, counter1 varchar, counter2 date, other1 char(3), other2 varchar default 'f', constraint pk primary key (k1,k2))"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + String dml = "UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " + + "ON DUPLICATE KEY UPDATE counter1 = counter1 || CASE WHEN LENGTH(counter1) < 10 THEN 'SMALL' ELSE 'LARGE' END || k2 || other2 || other1 "; + conn.createStatement().execute(dml); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("b",rs.getString(2)); + assertEquals("c",rs.getString(3)); + assertEquals(null,rs.getDate(4)); + assertEquals("eee",rs.getString(5)); + assertEquals("f",rs.getString(6)); + assertFalse(rs.next()); + + conn.createStatement().execute(dml); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("b",rs.getString(2)); + assertEquals("cSMALLbfeee",rs.getString(3)); + assertEquals(null,rs.getDate(4)); + assertEquals("eee",rs.getString(5)); + assertEquals("f",rs.getString(6)); + assertFalse(rs.next()); + + conn.createStatement().execute(dml); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("b",rs.getString(2)); + assertEquals("cSMALLbfeeeLARGEbfeee",rs.getString(3)); + assertEquals(null,rs.getDate(4)); + assertEquals("eee",rs.getString(5)); + assertEquals("f",rs.getString(6)); + assertFalse(rs.next()); + + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a','b','c',null,'eee') " + + "ON DUPLICATE KEY UPDATE counter1 = to_char(rand()), counter2 = current_date() + 1"); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("b",rs.getString(2)); + double d = Double.parseDouble(rs.getString(3)); + assertTrue(d >= 0.0 && d <= 1.0); + Date date = rs.getDate(4); + assertTrue(date.after(new Date(System.currentTimeMillis()))); + assertEquals("eee",rs.getString(5)); + assertEquals("f",rs.getString(6)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testNewAndUpdateOnSingleVarcharColumn() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = counter1 || 'b'"; + conn.createStatement().execute(dml); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("b",rs.getString(2)); + assertFalse(rs.next()); + + conn.createStatement().execute(dml); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE substr(counter1,1,1) = 'b'"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("bb",rs.getString(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testDeleteOnSingleVarcharColumnAutoCommit() throws Exception { + testDeleteOnSingleVarcharColumn(true); + } + + @Test + public void testDeleteOnSingleVarcharColumnNoAutoCommit() throws Exception { + testDeleteOnSingleVarcharColumn(false); + } + + private void testDeleteOnSingleVarcharColumn(boolean autoCommit) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(autoCommit); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 varchar, counter2 smallint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + String dml = "UPSERT INTO " + tableName + " VALUES('a','b') ON DUPLICATE KEY UPDATE counter1 = null"; + conn.createStatement().execute(dml); + conn.createStatement().execute(dml); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(null,rs.getString(2)); + assertFalse(rs.next()); + + dml = "UPSERT INTO " + tableName + " VALUES('a','b',0)"; + conn.createStatement().execute(dml); + dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = null, counter2 = counter2 + 1"; + conn.createStatement().execute(dml); + dml = "UPSERT INTO " + tableName + " VALUES('a','b', 0) ON DUPLICATE KEY UPDATE counter1 = 'c', counter2 = counter2 + 1"; + conn.createStatement().execute(dml); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals("c",rs.getString(2)); + assertEquals(2,rs.getInt(3)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testIgnoreOnSingleColumn() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)"); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(10,rs.getLong(2)); + assertFalse(rs.next()); + + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE"); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(10,rs.getLong(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testInitialIgnoreWithUpdateOnSingleColumn() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + // Test ignore combined with update in same commit batch for new record + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY IGNORE"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(11,rs.getLong(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testOverrideOnDupKeyUpdateWithUpsert() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 bigint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + // Test upsert overriding ON DUPLICATE KEY entries + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',2) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)"); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(10,rs.getLong(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testNewAndMultiUpdateOnSingleColumnAutoCommit() throws Exception { + testNewAndMultiUpdateOnSingleColumn(true); + } + + @Test + public void testNewAndMultiUpdateOnSingleColumnNoAutoCommit() throws Exception { + testNewAndMultiUpdateOnSingleColumn(false); + } + + private void testNewAndMultiUpdateOnSingleColumn(boolean autoCommit) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(autoCommit); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 integer)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',5) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE"); // no impact + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); // VALUES ignored + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(2,rs.getLong(2)); + assertFalse(rs.next()); + + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2"); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(9,rs.getLong(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testNewAndMultiDifferentUpdateOnSingleColumn() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 decimal)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + String dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"; + conn.createStatement().execute(dml); + dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 2"; + conn.createStatement().execute(dml); + dml = "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"; + conn.createStatement().execute(dml); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(3,rs.getLong(2)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testNewAndMultiDifferentUpdateOnMultipleColumnsAutoCommit() throws Exception { + testNewAndMultiDifferentUpdateOnMultipleColumns(true); + } + + @Test + public void testNewAndMultiDifferentUpdateOnMultipleColumnsNoAutoCommit() throws Exception { + testNewAndMultiDifferentUpdateOnMultipleColumns(false); + } + + private void testNewAndMultiDifferentUpdateOnMultipleColumns(boolean autoCommit) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(autoCommit); + String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 bigint, counter2 tinyint)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + String dml = "UPSERT INTO " + tableName + " VALUES('a',0,0) ON DUPLICATE KEY UPDATE counter1 = counter2 + 1, counter2 = counter1 + 2"; + conn.createStatement().execute(dml); + conn.createStatement().execute(dml); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(1,rs.getLong(2)); + assertEquals(2,rs.getLong(3)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(1,rs.getLong(2)); + assertEquals(2,rs.getLong(3)); + assertFalse(rs.next()); + + conn.createStatement().execute(dml); + conn.commit(); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(3,rs.getLong(2)); + assertEquals(3,rs.getLong(3)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(3,rs.getLong(2)); + assertEquals(3,rs.getLong(3)); + assertFalse(rs.next()); + + conn.close(); + } + + @Test + public void testAtomicUpdate() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + final String tableName = generateUniqueName(); + String ddl = " create table " + tableName + "(pk varchar primary key, counter1 integer, counter2 integer)"; + conn.createStatement().execute(ddl); + createIndex(conn, tableName); + int nThreads = 10; + final int[] resultHolder = new int[1]; + final int nCommits = 100; + final int nIncrementsPerCommit = 2; + ExecutorService exec = Executors.newFixedThreadPool(nThreads); + List<Future> futures = Lists.newArrayListWithExpectedSize(nThreads); + Connection[] connections = new Connection[nThreads]; + for (int i = 0; i < nThreads; i++) { + connections[i] = DriverManager.getConnection(getUrl(), props); + } + for (int i = 0; i < nThreads; i++) { + final Connection myConn = connections[i]; + futures.add(exec.submit(new Runnable() { + @Override + public void run() { + String dml = "UPSERT INTO " + tableName + " VALUES('a',1) ON DUPLICATE KEY UPDATE counter1 = counter1 + 1"; + try { + for (int j = 0; j < nCommits; j++) { + for (int k = 0; k < nIncrementsPerCommit; k++) { + myConn.createStatement().execute(dml); + resultHolder[0]++; + } + myConn.commit(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + })); + } + Collections.shuffle(futures); + for (Future future : futures) { + future.get(); + } + exec.shutdownNow(); + + int finalResult = nThreads * nCommits * nIncrementsPerCommit; + //assertEquals(finalResult,resultHolder[0]); + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(finalResult,rs.getInt(2)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE counter1 >= 0"); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(finalResult,rs.getInt(2)); + assertFalse(rs.next()); + + conn.close(); + } + +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java index ba04ad7..e854f23 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java @@ -43,11 +43,11 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; @@ -125,7 +125,7 @@ public class IndexTestUtil { long ts = MetaDataUtil.getClientTimeStamp(dataMutation); if (dataMutation instanceof Delete && dataMutation.getFamilyCellMap().values().isEmpty()) { indexTable.newKey(ptr, indexValues); - row = indexTable.newRow(builder, ts, ptr); + row = indexTable.newRow(builder, ts, ptr, false); row.delete(); } else { // If no column families in table, then nothing to look for @@ -153,7 +153,7 @@ public class IndexTestUtil { } } indexTable.newKey(ptr, indexValues); - row = indexTable.newRow(builder, ts, ptr); + row = indexTable.newRow(builder, ts, ptr, false); int pos = 0; while ((pos = indexValuesSet.nextSetBit(pos)) >= 0) { int index = nIndexColumns + indexOffset + pos++; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 2e45d5a..83128f1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -698,4 +698,19 @@ public class TransactionIT extends ParallelStatsDisabledIT { } } + + + @Test + public void testOnDupKeyForTransactionalTable() throws Exception { + // TODO: we should support having a transactional table defined for a connectionless connection + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); + conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index fa1e9db..1d1a873 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -135,6 +135,8 @@ tokens EXECUTE = 'execute'; UPGRADE = 'upgrade'; DEFAULT = 'default'; + DUPLICATE = 'duplicate'; + IGNORE = 'ignore'; } @@ -707,10 +709,26 @@ finally{ contextStack.pop(); } upsert_node returns [UpsertStatement ret] : UPSERT (hint=hintClause)? INTO t=from_table_name (LPAREN p=upsert_column_refs RPAREN)? - ((VALUES LPAREN v=one_or_more_expressions RPAREN) | s=select_node) - {ret = factory.upsert(factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), v, s, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } - ; + ((VALUES LPAREN v=one_or_more_expressions RPAREN ( ON DUPLICATE KEY ( ig=IGNORE | ( UPDATE pairs=update_column_pairs ) ) )? ) | s=select_node) + {ret = factory.upsert( + factory.namedTable(null,t,p == null ? null : p.getFirst()), + hint, p == null ? null : p.getSecond(), + v, s, getBindCount(), + new HashMap<String, UDFParseNode>(udfParseNodes), + ig != null ? Collections.<Pair<ColumnName,ParseNode>>emptyList() : pairs != null ? pairs : null); } + ; + +update_column_pairs returns [ List<Pair<ColumnName,ParseNode>> ret] +@init{ret = new ArrayList<Pair<ColumnName,ParseNode>>(); } + : p=update_column_pair { ret.add(p); } + (COMMA p=update_column_pair { ret.add(p); } )* +; + +update_column_pair returns [ Pair<ColumnName,ParseNode> ret ] + : c=column_name EQ e=expression { $ret = new Pair<ColumnName,ParseNode>(c,e); } +; + upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret] @init{ret = new Pair<List<ColumnDef>,List<ColumnName>>(new ArrayList<ColumnDef>(), new ArrayList<ColumnName>()); } : d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index e0881cf..602cd6b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -159,11 +159,11 @@ public class DeleteCompiler { } // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the // row key will already have its value. - mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); + mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); for (int i = 0; i < indexTableRefs.size(); i++) { ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map rs.getCurrentRow().getKey(indexPtr); - indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); + indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -499,7 +499,7 @@ public class DeleteCompiler { Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); while (iterator.hasNext()) { - mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO)); + mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } return new MutationState(tableRef, mutation, 0, maxSize, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 1caf7be..85517a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -27,6 +27,7 @@ import java.sql.Timestamp; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -52,6 +54,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -75,6 +78,7 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.ConstraintViolationException; +import org.apache.phoenix.schema.DelegateColumn; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataEntityNotFoundException; @@ -96,6 +100,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PUnsignedLong; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -107,10 +112,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class UpsertCompiler { + private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, - byte[][] viewConstants) throws SQLException { + byte[][] viewConstants, byte[] onDupKeyBytes) throws SQLException { Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -154,7 +160,7 @@ public class UpsertCompiler { ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, regionPrefix.length)); } } - mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo)); + mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, @@ -208,7 +214,7 @@ public class UpsertCompiler { table.rowKeyOrderOptimizable()); values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null); rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { @@ -869,6 +875,85 @@ public class UpsertCompiler { constantExpressions.add(expression); nodeIndex++; } + byte[] onDupKeyBytesToBe = null; + List<Pair<ColumnName,ParseNode>> onDupKeyPairs = upsert.getOnDupKeyPairs(); + if (onDupKeyPairs != null) { + if (table.isImmutableRows()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build().buildException(); + } + if (table.isTransactional()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build().buildException(); + } + if (connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build().buildException(); + } + if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE + onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyIgnore(); + } else { // ON DUPLICATE KEY UPDATE + int position = 1; + UpdateColumnCompiler compiler = new UpdateColumnCompiler(context); + int nColumns = onDupKeyPairs.size(); + List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns); + LinkedHashSet<PColumn>updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1); + updateColumns.add(new PColumnImpl( + table.getPKColumns().get(0).getName(), // Use first PK column name as we know it won't conflict with others + null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false)); + for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) { + ColumnName colName = columnPair.getFirst(); + PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn(); + if (SchemaUtil.isPKColumn(updateColumn)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_PK_ON_DUP_KEY) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .setColumnName(updateColumn.getName().getString()) + .build().buildException(); + } + final int columnPosition = position++; + if (!updateColumns.add(new DelegateColumn(updateColumn) { + @Override + public int getPosition() { + return columnPosition; + } + })) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.DUPLICATE_COLUMN_IN_ON_DUP_KEY) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .setColumnName(updateColumn.getName().getString()) + .build().buildException(); + }; + ParseNode updateNode = columnPair.getSecond(); + compiler.setColumn(updateColumn); + Expression updateExpression = updateNode.accept(compiler); + // Check that updateExpression is coercible to updateColumn + if (updateExpression.getDataType() != null && !updateExpression.getDataType().isCastableTo(updateColumn.getDataType())) { + throw TypeMismatchException.newException( + updateExpression.getDataType(), updateColumn.getDataType(), "expression: " + + updateExpression.toString() + " for column " + updateColumn); + } + if (compiler.isAggregate()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .setColumnName(updateColumn.getName().getString()) + .build().buildException(); + } + updateExpressions.add(updateExpression); + } + PTable onDupKeyTable = PTableImpl.makePTable(table, updateColumns); + onDupKeyBytesToBe = PhoenixIndexBuilder.serializeOnDupKeyUpdate(onDupKeyTable, updateExpressions); + } + } + final byte[] onDupKeyBytes = onDupKeyBytesToBe; + return new MutationPlan() { @Override public ParameterMetaData getParameterMetaData() { @@ -958,7 +1043,7 @@ public class UpsertCompiler { indexMaintainer = table.getIndexMaintainer(parentTable, connection); viewConstants = IndexUtil.getViewConstants(parentTable); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes); return new MutationState(tableRef, mutation, 0, maxSize, connection); } @@ -1004,10 +1089,10 @@ public class UpsertCompiler { return upsertRef; } - private static final class UpsertValuesCompiler extends ExpressionCompiler { + private static class UpdateColumnCompiler extends ExpressionCompiler { private PColumn column; - private UpsertValuesCompiler(StatementContext context) { + private UpdateColumnCompiler(StatementContext context) { super(context); } @@ -1032,7 +1117,12 @@ public class UpsertCompiler { } return super.visit(node); } - + } + + private static class UpsertValuesCompiler extends UpdateColumnCompiler { + private UpsertValuesCompiler(StatementContext context) { + super(context); + } @Override public Expression visit(SequenceValueParseNode node) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index f09a20f..9fd59ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -535,7 +535,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } projectedTable.newKey(ptr, values); - PRow row = projectedTable.newRow(kvBuilder, ts, ptr); + PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false); for (; i < projectedColumns.size(); i++) { Expression expression = selectExpressions.get(i); if (expression.evaluate(result, ptr)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 2346224..ac5619f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -326,6 +326,12 @@ public enum SQLExceptionCode { return new SequenceNotFoundException(info.getSchemaName(), info.getTableName()); } }), + CANNOT_UPDATE_PK_ON_DUP_KEY(1218, "42Z18", "Primary key columns may not be udpated in ON DUPLICATE KEY UPDATE clause." ), + CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE(1219, "42Z19", "The ON DUPLICATE KEY UPDATE clause may not be used for immutable tables." ), + CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL(1220, "42Z20", "The ON DUPLICATE KEY UPDATE clause may not be used for transactional tables." ), + DUPLICATE_COLUMN_IN_ON_DUP_KEY(1221, "42Z21", "Duplicate column in ON DUPLICATE KEY UPDATE." ), + AGGREGATION_NOT_ALLOWED_IN_ON_DUP_KEY(1222, "42Z22", "Aggregation in ON DUPLICATE KEY UPDATE is not allowed." ), + CANNOT_SET_SCN_IN_ON_DUP_KEY(1223, "42Z23", "The CURRENT_SCN may not be set for statement using ON DUPLICATE KEY." ), /** Parser error. (errorcode 06, sqlState 42P) */ PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYNTAX_ERROR), http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 262f263..9d1344b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -55,6 +55,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; +import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; @@ -620,6 +621,8 @@ public class MutationState implements SQLCloseable { long timestampToUse = timestamp; while (iterator.hasNext()) { Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); + byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes(); + boolean hasOnDupKey = onDupKeyBytes != null; ImmutableBytesPtr key = rowEntry.getKey(); RowMutationState state = rowEntry.getValue(); if (tableWithRowTimestampCol) { @@ -635,7 +638,7 @@ public class MutationState implements SQLCloseable { } PRow row = tableRef.getTable() - .newRow(connection.getKeyValueBuilder(), timestampToUse, key); + .newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey); List<Mutation> rowMutations, rowMutationsPertainingToIndex; if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete row.delete(); @@ -650,6 +653,15 @@ public class MutationState implements SQLCloseable { row.setValue(valueEntry.getKey(), valueEntry.getValue()); } rowMutations = row.toRowMutations(); + // Pass through ON DUPLICATE KEY info through mutations + // In the case of the same clause being used on many statements, this will be + // inefficient because we're transmitting the same information for each mutation. + // TODO: use our ServerCache + for (Mutation mutation : rowMutations) { + if (onDupKeyBytes != null) { + mutation.setAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB, onDupKeyBytes); + } + } rowMutationsPertainingToIndex = rowMutations; } mutationList.addAll(rowMutations); @@ -1452,15 +1464,22 @@ public class MutationState implements SQLCloseable { @Nonnull private Map<PColumn,byte[]> columnValues; private int[] statementIndexes; @Nonnull private final RowTimestampColInfo rowTsColInfo; + private byte[] onDupKeyBytes; - public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo) { + public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, + byte[] onDupKeyBytes) { checkNotNull(columnValues); checkNotNull(rowTsColInfo); this.columnValues = columnValues; this.statementIndexes = new int[] {statementIndex}; this.rowTsColInfo = rowTsColInfo; + this.onDupKeyBytes = onDupKeyBytes; } + byte[] getOnDupKeyBytes() { + return onDupKeyBytes; + } + Map<PColumn, byte[]> getColumnValues() { return columnValues; } @@ -1470,7 +1489,14 @@ public class MutationState implements SQLCloseable { } void join(RowMutationState newRow) { - getColumnValues().putAll(newRow.getColumnValues()); + // If we already have a row and the new row has an ON DUPLICATE KEY clause + // ignore the new values (as that's what the server will do). + if (newRow.onDupKeyBytes == null) { + getColumnValues().putAll(newRow.getColumnValues()); + } + // Concatenate ON DUPLICATE KEY bytes to allow multiple + // increments of the same row in the same commit batch. + this.onDupKeyBytes = PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes); statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index eb5d3a8..84c8d7d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,23 +33,28 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.UserGroupInformation; @@ -61,15 +67,16 @@ import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.hbase.index.wal.IndexedKeyValue; import org.apache.phoenix.hbase.index.write.IndexFailurePolicy; import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.ServerUtil; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; + import com.google.common.collect.Multimap; /** @@ -189,6 +196,45 @@ public class Indexer extends BaseRegionObserver { this.recoveryWriter.stop(msg); } + /** + * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing + * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a + * real increment, though, it's really more of a Put. We translate the Increment into a + * list of mutations, at most a single Put and Delete that are the changes upon executing + * the list of ON DUPLICATE KEY clauses for this row. + */ + @Override + public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e, + final Increment inc) throws IOException { + try { + List<Mutation> mutations = this.builder.executeAtomicOp(inc); + if (mutations == null) { + return null; + } + + // Causes the Increment to be ignored as we're committing the mutations + // ourselves below. + e.bypass(); + e.complete(); + // ON DUPLICATE KEY IGNORE will return empty list if row already exists + // as no action is required in that case. + if (!mutations.isEmpty()) { + HRegion region = e.getEnvironment().getRegion(); + // Otherwise, submit the mutations directly here + region.mutateRowsWithLocks( + mutations, + Collections.<byte[]>emptyList(), // Rows are already locked + HConstants.NO_NONCE, HConstants.NO_NONCE); + } + return Result.EMPTY_RESULT; + } catch (Throwable t) { + throw ServerUtil.createIOException( + "Unable to process ON DUPLICATE IGNORE for " + + e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + + "(" + Bytes.toStringBinary(inc.getRow()) + ")", t); + } + } + @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { @@ -206,13 +252,15 @@ public class Indexer extends BaseRegionObserver { "Somehow didn't return an index update but also didn't propagate the failure to the client!"); } + private static final OperationStatus SUCCESS = new OperationStatus(OperationStatusCode.SUCCESS); + public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { // first group all the updates for a single row into a single update to be processed Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); - + Durability defaultDurability = Durability.SYNC_WAL; if(c.getEnvironment().getRegion() != null) { defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability(); @@ -222,33 +270,35 @@ public class Indexer extends BaseRegionObserver { Durability durability = Durability.SKIP_WAL; for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); + if (this.builder.isAtomicOp(m)) { + miniBatchOp.setOperationStatus(i, SUCCESS); + continue; + } // skip this mutation if we aren't enabling indexing // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) // should be indexed, which means we need to expose another method on the builder. Such is the // way optimization go though. - if (!this.builder.isEnabled(m)) { - continue; - } - - Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? - defaultDurability : m.getDurability(); - if (effectiveDurablity.ordinal() > durability.ordinal()) { - durability = effectiveDurablity; - } - - // add the mutation to the batch set - ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); - MultiMutation stored = mutations.get(row); - // we haven't seen this row before, so add it - if (stored == null) { - stored = new MultiMutation(row); - mutations.put(row, stored); + if (this.builder.isEnabled(m)) { + Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? + defaultDurability : m.getDurability(); + if (effectiveDurablity.ordinal() > durability.ordinal()) { + durability = effectiveDurablity; + } + + // add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); } - stored.addAll(m); } // early exit if it turns out we don't have any edits - if (mutations.entrySet().size() == 0) { + if (mutations.isEmpty()) { return; } @@ -360,7 +410,7 @@ public class Indexer extends BaseRegionObserver { private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates) throws Exception { //short circuit, if we don't need to do any work - if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) { + if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) { // already did the index update in prePut, so we are done return; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index 4e329e9..b9174b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -12,17 +12,19 @@ package org.apache.phoenix.hbase.index.builder; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.Collection; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; /** @@ -91,6 +93,16 @@ public abstract class BaseIndexBuilder implements IndexBuilder { return this.codec.isEnabled(m); } + @Override + public boolean isAtomicOp(Mutation m) throws IOException { + return false; + } + + @Override + public List<Mutation> executeAtomicOp(Increment inc) throws IOException { + return null; + } + /** * Exposed for testing! * http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index f411b8e..325904d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -178,6 +179,14 @@ public class IndexBuildManager implements Stoppable { return delegate.isEnabled(m); } + public boolean isAtomicOp(Mutation m) throws IOException { + return delegate.isAtomicOp(m); + } + + public List<Mutation> executeAtomicOp(Increment inc) throws IOException { + return delegate.executeAtomicOp(inc); + } + @Override public void stop(String why) { if (stopped) { @@ -196,4 +205,5 @@ public class IndexBuildManager implements Stoppable { public IndexBuilder getBuilderForTesting() { return this.delegate; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index 36aba77..dff205a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -19,11 +19,13 @@ package org.apache.phoenix.hbase.index.builder; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -65,19 +67,10 @@ public interface IndexBuilder extends Stoppable { * Implementers must ensure that this method is thread-safe - it could (and probably will) be * called concurrently for different mutations, which may or may not be part of the same batch. * @param mutation update to the primary table to be indexed. - * @param context TODO + * @param context index meta data for the mutation * @return a Map of the mutations to make -> target index table name * @throws IOException on failure */ - /* TODO: - Create BaseIndexBuilder with everything except getIndexUpdate(). - Derive two concrete classes: NonTxIndexBuilder and TxIndexBuilder. - NonTxIndexBuilder will be current impl of this method. - TxIndexBuilder will use a scan with skipScan over TxAwareHBase to find the latest values. - Conditionally don't do WALEdit stuff for txnal table (ensure Phoenix/HBase tolerates index WAl edit info not being there) - Noop Failure mode - */ - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException; /** @@ -139,4 +132,20 @@ public interface IndexBuilder extends Stoppable { * @throws IOException */ public boolean isEnabled(Mutation m) throws IOException; + + /** + * True if mutation has an ON DUPLICATE KEY clause + * @param m mutation + * @return true if mutation has ON DUPLICATE KEY expression and false otherwise. + * @throws IOException + */ + public boolean isAtomicOp(Mutation m) throws IOException; + + /** + * Calculate the mutations based on the ON DUPLICATE KEY clause + * @param inc increment to run against + * @return list of mutations as a result of executing the ON DUPLICATE KEY clause + * or null if Increment does not represent an ON DUPLICATE KEY clause. + */ + public List<Mutation> executeAtomicOp(Increment inc) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java index 93de11e..e6d683e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java @@ -23,7 +23,6 @@ import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; * added to the codec, as well as potentially not haivng to implement some methods. */ public interface IndexCodec { - /** * Do any code initialization necessary * http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2325a41/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java index e3bd7a8..741bf87 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java @@ -18,9 +18,11 @@ package org.apache.phoenix.hbase.index.util; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; @@ -40,13 +42,14 @@ public abstract class KeyValueBuilder { * @throws RuntimeException if there is an IOException thrown from the underlying {@link Put} */ @SuppressWarnings("javadoc") - public static void addQuietly(Put put, KeyValueBuilder builder, KeyValue kv) { - try { - put.add(kv); - } catch (IOException e) { - throw new RuntimeException("KeyValue Builder " + builder + " created an invalid kv: " - + kv + "!"); + public static void addQuietly(Mutation m, KeyValueBuilder builder, KeyValue kv) { + byte [] family = CellUtil.cloneFamily(kv); + List<Cell> list = m.getFamilyCellMap().get(family); + if (list == null) { + list = new ArrayList<Cell>(); + m.getFamilyCellMap().put(family, list); } + list.add(kv); } /**