Repository: phoenix Updated Branches: refs/heads/4.0 b9d1722c6 -> 43d58a7bd
Surface partial saves in CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37 Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/43d58a7b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/43d58a7b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/43d58a7b Branch: refs/heads/4.0 Commit: 43d58a7bd98238ab4d45efd56b99c3b3a35e3550 Parents: b9d1722 Author: Eli Levine <elilev...@apache.org> Authored: Thu Feb 26 21:04:25 2015 -0800 Committer: Eli Levine <elilev...@apache.org> Committed: Thu Feb 26 21:04:25 2015 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/PartialCommitIT.java | 302 +++++++++++++++++++ .../apache/phoenix/compile/DeleteCompiler.java | 13 +- .../apache/phoenix/compile/UpsertCompiler.java | 13 +- .../apache/phoenix/execute/CommitException.java | 35 ++- .../apache/phoenix/execute/MutationState.java | 156 ++++++---- .../apache/phoenix/jdbc/PhoenixConnection.java | 37 ++- .../phoenix/jdbc/PhoenixPreparedStatement.java | 7 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 3 + .../phoenix/execute/MutationStateTest.java | 64 ++++ .../java/org/apache/phoenix/query/BaseTest.java | 2 +- 10 files changed, 543 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java new file mode 100644 index 0000000..550d7de --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -0,0 +1,302 @@ +/* + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Sets.newHashSet; +import static java.util.Collections.singletonList; +import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver; +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; +import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; +import static org.apache.phoenix.util.TestUtil.LOCALHOST; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class PartialCommitIT { + + private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase(); + private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me"); + private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"; + private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table"; + private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + " where k='z'"; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static String url; + private static Driver driver; + private static final Properties props = new Properties(); + + static { + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); + } + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + setUpConfigForMiniCluster(conf); + conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class); + conf.setBoolean("hbase.coprocessor.abortonerror", false); + conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false); + TEST_UTIL.startMiniCluster(); + String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); + url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort + + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; + + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Must update config before starting server + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + createTablesWithABitOfData(); + } + + private static void createTablesWithABitOfData() throws Exception { + Properties props = new Properties(); + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); + + try (Connection con = driver.connect(url, new Properties())) { + Statement sta = con.createStatement(); + sta.execute("create table a_success_table (k varchar primary key, c varchar)"); + sta.execute("create table b_failure_table (k varchar primary key, c varchar)"); + sta.execute("create table c_success_table (k varchar primary key, c varchar)"); + con.commit(); + } + + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); + + try (Connection con = driver.connect(url, new Properties())) { + con.setAutoCommit(false); + Statement sta = con.createStatement(); + for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) { + sta.execute("upsert into " + table + " values ('z', 'z')"); + sta.execute("upsert into " + table + " values ('zz', 'zz')"); + sta.execute("upsert into " + table + " values ('zzz', 'zzz')"); + } + con.commit(); + } + } + + @AfterClass + public static void teardownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testNoFailure() { + testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false, + singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1))); + } + + @Test + public void testUpsertFailure() { + testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", + UPSERT_TO_FAIL, + "upsert into a_success_table values ('testUpsertFailure2', 'b')"), + 1, new int[]{1}, true, + newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList(new Integer(2), new Integer(0))); + } + + @Test + public void testUpsertSelectFailure() throws SQLException { + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); + + try (Connection con = driver.connect(url, new Properties())) { + con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"); + con.commit(); + } + + testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", + UPSERT_SELECT_TO_FAIL), + 1, new int[]{1}, true, + newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList(new Integer(2), new Integer(0))); + } + + @Test + public void testDeleteFailure() { + testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", + DELETE_TO_FAIL, + "upsert into a_success_table values ('testDeleteFailure2', 'b')"), + 1, new int[]{1}, true, + newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), + newArrayList(new Integer(2), new Integer(1))); + } + + /** + * {@link MutationState} keeps mutations ordered lexicographically by table name. + */ + @Test + public void testOrderOfMutationsIsPredicatable() { + testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order + UPSERT_TO_FAIL, + "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order + 2, new int[]{0,1}, true, + newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList(new Integer(0), new Integer(1), new Integer(0))); + } + + @Test + public void checkThatAllStatementTypesMaintainOrderInConnection() { + testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", + "upsert into a_success_table select k, c from c_success_table", + DELETE_TO_FAIL, + "select * from a_success_table", + UPSERT_TO_FAIL), + 2, new int[]{2,4}, true, + newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), + newArrayList(new Integer(4), new Integer(0), new Integer(1))); + } + + private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail, + List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) { + Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size()); + + try (Connection con = getConnectionWithTableOrderPreservingMutationState()) { + con.setAutoCommit(false); + Statement sta = con.createStatement(); + for (String statement : statements) { + sta.execute(statement); + } + try { + con.commit(); + if (willFail) { + fail("Expected at least one statement in the list to fail"); + } else { + assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit() + } + } catch (SQLException sqle) { + if (!willFail) { + fail("Expected no statements to fail"); + } + assertEquals(CommitException.class, sqle.getClass()); + int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes(); + assertEquals(failureCount, uncommittedStatementIndexes.length); + assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes); + } + + // verify data in HBase + for (int i = 0; i < countStatementsForVerification.size(); i++) { + String countStatement = countStatementsForVerification.get(i); + ResultSet rs = sta.executeQuery(countStatement); + if (!rs.next()) { + fail("Expected a single row from count query"); + } + assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1)); + } + } catch (SQLException e) { + fail(e.toString()); + } + } + + private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { + Connection con = driver.connect(url, new Properties()); + PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); + final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); + return new PhoenixConnection(phxCon) { + protected MutationState newMutationState(int maxSize) { + return new MutationState(maxSize, this, mutations); + }; + }; + } + + public static class FailingRegionObserver extends SimpleRegionObserver { + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, + final Durability durability) throws HBaseIOException { + if (shouldFailUpsert(c, put) || shouldFailDelete(c, put)) { + // throwing anything other than instances of IOException result + // in this coprocessor being unloaded + // DoNotRetryIOException tells HBase not to retry this mutation + // multiple times + throw new DoNotRetryIOException(); + } + } + + private static boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow()); + } + + private static boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + return TABLE_NAME_TO_FAIL.equals(tableName) && + // Phoenix deletes are sent as Puts with empty values + put.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0; + } + } + + /** + * Used for ordering {@link MutationState#mutations} map. + */ + private static class TableRefComparator implements Comparator<TableRef> { + @Override + public int compare(TableRef tr1, TableRef tr2) { + return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString()); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 322d24a..6f51a4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -39,6 +39,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMetaDataCacheClient; @@ -106,8 +107,8 @@ public class DeleteCompiler { ConnectionQueryServices services = connection.getQueryServices(); final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null; + Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,RowMutationState> indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. if (indexTableRef != null) { @@ -147,11 +148,11 @@ public class DeleteCompiler { } table.newKey(ptr, values); } - mutations.put(ptr, PRow.DELETE_MARKER); + mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter())); if (indexTableRef != null) { ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map rs.getCurrentRow().getKey(indexPtr); - indexMutations.put(indexPtr, PRow.DELETE_MARKER); + indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter())); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -429,9 +430,9 @@ public class DeleteCompiler { // keys for our ranges ScanRanges ranges = context.getScanRanges(); Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); + Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); while (iterator.hasNext()) { - mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER); + mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter())); } return new MutationState(tableRef, mutation, 0, maxSize, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index b21cc2f..f172814 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -42,6 +42,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; @@ -95,7 +96,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class UpsertCompiler { - private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) { + private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) { Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -114,7 +115,7 @@ public class UpsertCompiler { } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); table.newKey(ptr, pkValues); - mutation.put(ptr, columnValues); + mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter())); } private static MutationState upsertSelect(PhoenixStatement statement, @@ -128,7 +129,7 @@ public class UpsertCompiler { boolean isAutoCommit = connection.getAutoCommit(); byte[][] values = new byte[columnIndexes.length][]; int rowCount = 0; - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); PTable table = tableRef.getTable(); ResultSet rs = new PhoenixResultSet(iterator, projector, statement); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); @@ -156,7 +157,7 @@ public class UpsertCompiler { column.getMaxLength(), column.getScale(), column.getSortOrder()); values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement); rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { @@ -802,8 +803,8 @@ public class UpsertCompiler { throw new IllegalStateException(); } } - Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1); - setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation); + Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1); + setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement); return new MutationState(tableRef, mutation, 0, maxSize, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java index 63bf6a1..a9d8311 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java @@ -19,23 +19,32 @@ package org.apache.phoenix.execute; import java.sql.SQLException; +import org.apache.phoenix.jdbc.PhoenixConnection; + public class CommitException extends SQLException { - private static final long serialVersionUID = 1L; - private final MutationState uncommittedState; - private final MutationState committedState; + private static final long serialVersionUID = 2L; + private final int[] uncommittedStatementIndexes; - public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) { + public CommitException(Exception e, int[] uncommittedStatementIndexes) { super(e); - this.uncommittedState = uncommittedState; - this.committedState = committedState; - } - - public MutationState getUncommittedState() { - return uncommittedState; + this.uncommittedStatementIndexes = uncommittedStatementIndexes; } - public MutationState getCommittedState() { - return committedState; + /** + * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned + * correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to + * commit/rollback. + * <p> + * Statements whose index is returned in this set correspond to one or more HBase mutations that have failed. + * <p> + * Statement indexes are maintained correctly for connections that mutate and query + * <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order + * is undefined for connections that execute metadata operations due to the fact that Phoenix rolls + * back connections after metadata mutations. + * + * @see PhoenixConnection#getStatementExecutionCounter() + */ + public int[] getUncommittedStatementIndexes() { + return uncommittedStatementIndexes; } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 04626a6..8053f15 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -19,6 +19,7 @@ package org.apache.phoenix.execute; import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -61,9 +62,11 @@ import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.sun.istack.NotNull; /** * @@ -78,40 +81,32 @@ public class MutationState implements SQLCloseable { private PhoenixConnection connection; private final long maxSize; private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); - private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? + private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; private long sizeOffset; private int numRows = 0; - public MutationState(int maxSize, PhoenixConnection connection) { + MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) { + this.maxSize = maxSize; + this.connection = connection; + this.mutations = mutations; + } + + public MutationState(long maxSize, PhoenixConnection connection) { this(maxSize,connection,0); } - public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) { - this.maxSize = maxSize; - this.connection = connection; + public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { + this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize())); this.sizeOffset = sizeOffset; } - public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this.maxSize = maxSize; - this.connection = connection; + public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { + this(maxSize, connection, sizeOffset); this.mutations.put(table, mutations); - this.sizeOffset = sizeOffset; this.numRows = mutations.size(); throwIfTooBig(); } - private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) { - this.maxSize = maxSize; - this.connection = connection; - this.sizeOffset = sizeOffset; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) { - numRows += entry.getValue().size(); - this.mutations.put(entry.getKey(), entry.getValue()); - } - throwIfTooBig(); - } - private void throwIfTooBig() { if (numRows > maxSize) { // TODO: throw SQLException ? @@ -134,29 +129,28 @@ public class MutationState implements SQLCloseable { } this.sizeOffset += newMutation.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue()); + Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue()); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new - for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) { + for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { // Replace existing row with new row - Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); - if (existingValues != null) { + RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); + if (existingRowMutationState != null) { + Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); if (existingValues != PRow.DELETE_MARKER) { - Map<PColumn,byte[]> newRow = rowEntry.getValue(); + Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. if (newRow != PRow.DELETE_MARKER) { - // Replace existing column values with new column values - for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) { - existingValues.put(valueEntry.getKey(), valueEntry.getValue()); - } + // Merge existing column values with new column values + existingRowMutationState.join(rowEntry.getValue()); // Now that the existing row has been merged with the new row, replace it back - // again (since it was replaced with the new one above). - existingRows.put(rowEntry.getKey(), existingValues); + // again (since it was merged with the new one above). + existingRows.put(rowEntry.getKey(), existingRowMutationState); } } } else { @@ -176,16 +170,16 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) { + private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) { final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : Iterators.<PTable>emptyIterator(); final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; - Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); + Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); + Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next(); ImmutableBytesPtr key = rowEntry.getKey(); PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key); List<Mutation> rowMutations, rowMutationsPertainingToIndex; @@ -197,7 +191,7 @@ public class MutationState implements SQLCloseable { // delete rows). rowMutationsPertainingToIndex = Collections.emptyList(); } else { - for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) { + for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { row.setValue(valueEntry.getKey(), valueEntry.getValue()); } rowMutations = row.toRowMutations(); @@ -249,14 +243,14 @@ public class MutationState implements SQLCloseable { } public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) { - final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); + final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Iterators.emptyIterator(); } Long scn = connection.getSCN(); final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; return new Iterator<Pair<byte[],List<Mutation>>>() { - private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next(); + private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { @@ -297,7 +291,7 @@ public class MutationState implements SQLCloseable { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { TableRef tableRef = entry.getKey(); long serverTimeStamp = tableRef.getTimeStamp(); PTable table = tableRef.getTable(); @@ -312,12 +306,15 @@ public class MutationState implements SQLCloseable { // TODO: use bitset? table = result.getTable(); PColumn[] columns = new PColumn[table.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) { - Map<PColumn,byte[]> valueEntry = rowEntry.getValue(); - if (valueEntry != PRow.DELETE_MARKER) { - for (PColumn column : valueEntry.keySet()) { - columns[column.getPosition()] = column; - } + for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { + RowMutationState valueEntry = rowEntry.getValue(); + if (valueEntry != null) { + Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); + if (colValues != PRow.DELETE_MARKER) { + for (PColumn column : colValues.keySet()) { + columns[column.getPosition()] = column; + } + } } } for (PColumn column : columns) { @@ -357,15 +354,14 @@ public class MutationState implements SQLCloseable { int i = 0; byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); long[] serverTimeStamps = validate(); - Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); - List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size()); + Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); // add tracing for this operation TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); Span span = trace.getSpan(); while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next(); - Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue(); + Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); + Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); TableRef tableRef = entry.getKey(); PTable table = tableRef.getTable(); table.getIndexMaintainers(tempPtr, connection); @@ -425,7 +421,6 @@ public class MutationState implements SQLCloseable { child.stop(); shouldRetry = false; if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection)); - committedList.add(entry); } catch (Exception e) { SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); if (inferredE != null) { @@ -446,9 +441,7 @@ public class MutationState implements SQLCloseable { } e = inferredE; } - // Throw to client with both what was committed so far and what is left to be committed. - // That way, client can either undo what was done or try again with what was not done. - sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection)); + sqlE = new CommitException(e, getUncommittedSattementIndexes()); } finally { try { hTable.close(); @@ -488,7 +481,64 @@ public class MutationState implements SQLCloseable { numRows = 0; } + private int[] getUncommittedSattementIndexes() { + int[] result = new int[0]; + for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) { + for (RowMutationState rowMutationState : rowMutations.values()) { + result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes()); + } + } + return result; + } + @Override public void close() throws SQLException { } + + public static int[] joinSortedIntArrays(int[] a, int[] b) { + int[] result = new int[a.length + b.length]; + int i = 0, j = 0, k = 0, current; + while (i < a.length && j < b.length) { + current = a[i] < b[j] ? a[i++] : b[j++]; + for ( ; i < a.length && a[i] == current; i++); + for ( ; j < b.length && b[j] == current; j++); + result[k++] = current; + } + while (i < a.length) { + for (current = a[i++] ; i < a.length && a[i] == current; i++); + result[k++] = current; + } + while (j < b.length) { + for (current = b[j++] ; j < b.length && b[j] == current; j++); + result[k++] = current; + } + return Arrays.copyOf(result, k); + } + + public static class RowMutationState { + private Map<PColumn,byte[]> columnValues; + private int[] statementIndexes; + + public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) { + Preconditions.checkNotNull(columnValues); + + this.columnValues = columnValues; + this.statementIndexes = new int[] {statementIndex}; + } + + Map<PColumn, byte[]> getColumnValues() { + return columnValues; + } + + int[] getStatementIndexes() { + return statementIndexes; + } + + void join(RowMutationState newRow) { + getColumnValues().putAll(newRow.getColumnValues()); + statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); + } + + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index de9e323..c9ac94a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.function.FunctionArgumentType; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -121,21 +122,21 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private final Properties info; private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); - private final MutationState mutationState; + private MutationState mutationState; private final int mutateBatchSize; private final Long scn; private boolean isAutoCommit = false; private PMetaData metaData; private final PName tenantId; - private final String datePattern; + private final String datePattern; private final String timePattern; private final String timestampPattern; - + private int statementExecutionCounter; private boolean isClosed = false; private Sampler<?> sampler; private boolean readOnly = false; - private Map<String, String> customTracingAnnotations = emptyMap(); - + private Map<String, String> customTracingAnnotations = emptyMap(); + static { Tracing.addTraceMetricsSource(); } @@ -150,17 +151,20 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; + this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { this(connection.getQueryServices(), connection, scn); this.sampler = connection.sampler; + this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; + this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { @@ -233,7 +237,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } }); - this.mutationState = new MutationState(maxSize, this); + this.mutationState = newMutationState(maxSize); this.services.addConnection(this); // setup tracing, if its enabled @@ -361,6 +365,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return metaData; } + protected MutationState newMutationState(int maxSize) { + return new MutationState(maxSize, this); + } + public MutationState getMutationState() { return mutationState; } @@ -426,6 +434,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return null; } }, Tracing.withTracing(this, "committing mutations")); + statementExecutionCounter = 0; } @Override @@ -626,6 +635,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd @Override public void rollback() throws SQLException { mutationState.rollback(this); + statementExecutionCounter = 0; } @Override @@ -776,4 +786,19 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public KeyValueBuilder getKeyValueBuilder() { return this.services.getKeyValueBuilder(); } + + /** + * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before + * commit or rollback. 0-based. Used to associate partial save errors with SQL statements + * invoked by users. + * @see CommitException + * @see #incrementStatementExecutionCounter() + */ + public int getStatementExecutionCounter() { + return statementExecutionCounter; + } + + public void incrementStatementExecutionCounter() { + statementExecutionCounter++; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java index 25be8c0..a23484c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java @@ -54,8 +54,8 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.ExecuteQueryNotApplicableException; import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.SQLCloseable; @@ -79,8 +79,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar private final String query; - public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, - IOException { + public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, IOException { super(connection); this.statement = parser.nextStatement(new ExecutableNodeFactory()); if (this.statement == null) { throw new EOFException(); } @@ -89,7 +88,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar this.parameters = Arrays.asList(new Object[statement.getBindCount()]); Collections.fill(parameters, BindManager.UNBOUND_PARAMETER); } - + public PhoenixPreparedStatement(PhoenixConnection connection, String query) throws SQLException { super(connection); this.query = query; http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 4ca5bb5..c6d086a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -151,6 +151,7 @@ import com.google.common.collect.Lists; * @since 0.1 */ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement { + private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class); public enum Operation { @@ -243,6 +244,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho setLastResultSet(rs); setLastUpdateCount(NO_UPDATE); setLastUpdateOperation(stmt.getOperation()); + connection.incrementStatementExecutionCounter(); return rs; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException @@ -289,6 +291,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount()); setLastUpdateCount(lastUpdateCount); setLastUpdateOperation(stmt.getOperation()); + connection.incrementStatementExecutionCounter(); return lastUpdateCount; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java new file mode 100644 index 0000000..67c3353 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class MutationStateTest { + + @Test + public void testJoinIntArrays() { + // simple case + int[] a = new int[] {1}; + int[] b = new int[] {2}; + int[] result = joinSortedIntArrays(a, b); + + assertEquals(2, result.length); + assertArrayEquals(new int[] {1,2}, result); + + // empty arrays + a = new int[0]; + b = new int[0]; + result = joinSortedIntArrays(a, b); + + assertEquals(0, result.length); + assertArrayEquals(new int[] {}, result); + + // dupes between arrays + a = new int[] {1,2,3}; + b = new int[] {1,2,4}; + result = joinSortedIntArrays(a, b); + + assertEquals(4, result.length); + assertArrayEquals(new int[] {1,2,3,4}, result); + + // dupes within arrays + a = new int[] {1,2,2,3}; + b = new int[] {1,2,4}; + result = joinSortedIntArrays(a, b); + + assertEquals(4, result.length); + assertArrayEquals(new int[] {1,2,3,4}, result); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/43d58a7b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 9947440..b64eff8 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -655,7 +655,7 @@ public abstract class BaseTest { * Create a {@link PhoenixTestDriver} and register it. * @return an initialized and registered {@link PhoenixTestDriver} */ - protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception { + public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception { PhoenixTestDriver newDriver = new PhoenixTestDriver(props); DriverManager.registerDriver(newDriver); Driver oldDriver = DriverManager.getDriver(url);