Revert "Surface partial saves in CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37"
This reverts commit fa58c7821a2e8fce30a8c0ff6e42aa00134dbce0. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/569469a4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/569469a4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/569469a4 Branch: refs/heads/calcite Commit: 569469a46bae57cc4d6cbbcd7e01d535560f07e2 Parents: fa58c78 Author: Eli Levine <elilev...@apache.org> Authored: Fri Feb 27 11:15:28 2015 -0800 Committer: Eli Levine <elilev...@apache.org> Committed: Fri Feb 27 11:15:28 2015 -0800 ---------------------------------------------------------------------- .../apache/phoenix/execute/PartialCommitIT.java | 302 ------------------- .../apache/phoenix/compile/DeleteCompiler.java | 13 +- .../apache/phoenix/compile/UpsertCompiler.java | 13 +- .../apache/phoenix/execute/CommitException.java | 35 +-- .../apache/phoenix/execute/MutationState.java | 156 ++++------ .../apache/phoenix/jdbc/PhoenixConnection.java | 37 +-- .../phoenix/jdbc/PhoenixPreparedStatement.java | 7 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 3 - .../phoenix/execute/MutationStateTest.java | 64 ---- .../java/org/apache/phoenix/query/BaseTest.java | 2 +- 10 files changed, 89 insertions(+), 543 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java deleted file mode 100644 index 550d7de..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright 2014 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.execute; - -import static com.google.common.collect.Lists.newArrayList; -import static com.google.common.collect.Sets.newHashSet; -import static java.util.Collections.singletonList; -import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver; -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; -import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.sql.Driver; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -@Category(NeedsOwnMiniClusterTest.class) -public class PartialCommitIT { - - private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase(); - private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me"); - private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"; - private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table"; - private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + " where k='z'"; - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static String url; - private static Driver driver; - private static final Properties props = new Properties(); - - static { - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); - } - - @BeforeClass - public static void setupCluster() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - setUpConfigForMiniCluster(conf); - conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class); - conf.setBoolean("hbase.coprocessor.abortonerror", false); - conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false); - TEST_UTIL.startMiniCluster(); - String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); - url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort - + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; - - Map<String, String> props = Maps.newHashMapWithExpectedSize(1); - // Must update config before starting server - props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); - driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); - createTablesWithABitOfData(); - } - - private static void createTablesWithABitOfData() throws Exception { - Properties props = new Properties(); - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); - - try (Connection con = driver.connect(url, new Properties())) { - Statement sta = con.createStatement(); - sta.execute("create table a_success_table (k varchar primary key, c varchar)"); - sta.execute("create table b_failure_table (k varchar primary key, c varchar)"); - sta.execute("create table c_success_table (k varchar primary key, c varchar)"); - con.commit(); - } - - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); - - try (Connection con = driver.connect(url, new Properties())) { - con.setAutoCommit(false); - Statement sta = con.createStatement(); - for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) { - sta.execute("upsert into " + table + " values ('z', 'z')"); - sta.execute("upsert into " + table + " values ('zz', 'zz')"); - sta.execute("upsert into " + table + " values ('zzz', 'zzz')"); - } - con.commit(); - } - } - - @AfterClass - public static void teardownCluster() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testNoFailure() { - testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false, - singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1))); - } - - @Test - public void testUpsertFailure() { - testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", - UPSERT_TO_FAIL, - "upsert into a_success_table values ('testUpsertFailure2', 'b')"), - 1, new int[]{1}, true, - newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), - newArrayList(new Integer(2), new Integer(0))); - } - - @Test - public void testUpsertSelectFailure() throws SQLException { - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); - - try (Connection con = driver.connect(url, new Properties())) { - con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"); - con.commit(); - } - - testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", - UPSERT_SELECT_TO_FAIL), - 1, new int[]{1}, true, - newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), - newArrayList(new Integer(2), new Integer(0))); - } - - @Test - public void testDeleteFailure() { - testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", - DELETE_TO_FAIL, - "upsert into a_success_table values ('testDeleteFailure2', 'b')"), - 1, new int[]{1}, true, - newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), - newArrayList(new Integer(2), new Integer(1))); - } - - /** - * {@link MutationState} keeps mutations ordered lexicographically by table name. - */ - @Test - public void testOrderOfMutationsIsPredicatable() { - testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order - UPSERT_TO_FAIL, - "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order - 2, new int[]{0,1}, true, - newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), - newArrayList(new Integer(0), new Integer(1), new Integer(0))); - } - - @Test - public void checkThatAllStatementTypesMaintainOrderInConnection() { - testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", - "upsert into a_success_table select k, c from c_success_table", - DELETE_TO_FAIL, - "select * from a_success_table", - UPSERT_TO_FAIL), - 2, new int[]{2,4}, true, - newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'", - "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), - newArrayList(new Integer(4), new Integer(0), new Integer(1))); - } - - private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail, - List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) { - Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size()); - - try (Connection con = getConnectionWithTableOrderPreservingMutationState()) { - con.setAutoCommit(false); - Statement sta = con.createStatement(); - for (String statement : statements) { - sta.execute(statement); - } - try { - con.commit(); - if (willFail) { - fail("Expected at least one statement in the list to fail"); - } else { - assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit() - } - } catch (SQLException sqle) { - if (!willFail) { - fail("Expected no statements to fail"); - } - assertEquals(CommitException.class, sqle.getClass()); - int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes(); - assertEquals(failureCount, uncommittedStatementIndexes.length); - assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes); - } - - // verify data in HBase - for (int i = 0; i < countStatementsForVerification.size(); i++) { - String countStatement = countStatementsForVerification.get(i); - ResultSet rs = sta.executeQuery(countStatement); - if (!rs.next()) { - fail("Expected a single row from count query"); - } - assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1)); - } - } catch (SQLException e) { - fail(e.toString()); - } - } - - private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { - Connection con = driver.connect(url, new Properties()); - PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); - final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); - return new PhoenixConnection(phxCon) { - protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this, mutations); - }; - }; - } - - public static class FailingRegionObserver extends SimpleRegionObserver { - @Override - public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, - final Durability durability) throws HBaseIOException { - if (shouldFailUpsert(c, put) || shouldFailDelete(c, put)) { - // throwing anything other than instances of IOException result - // in this coprocessor being unloaded - // DoNotRetryIOException tells HBase not to retry this mutation - // multiple times - throw new DoNotRetryIOException(); - } - } - - private static boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { - String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow()); - } - - private static boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { - String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - return TABLE_NAME_TO_FAIL.equals(tableName) && - // Phoenix deletes are sent as Puts with empty values - put.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0; - } - } - - /** - * Used for ordering {@link MutationState#mutations} map. - */ - private static class TableRefComparator implements Comparator<TableRef> { - @Override - public int compare(TableRef tr1, TableRef tr2) { - return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString()); - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 6f51a4c..322d24a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -39,7 +39,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMetaDataCacheClient; @@ -107,8 +106,8 @@ public class DeleteCompiler { ConnectionQueryServices services = connection.getQueryServices(); final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); - Map<ImmutableBytesPtr,RowMutationState> indexMutations = null; + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. if (indexTableRef != null) { @@ -148,11 +147,11 @@ public class DeleteCompiler { } table.newKey(ptr, values); } - mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter())); + mutations.put(ptr, PRow.DELETE_MARKER); if (indexTableRef != null) { ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map rs.getCurrentRow().getKey(indexPtr); - indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter())); + indexMutations.put(indexPtr, PRow.DELETE_MARKER); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -430,9 +429,9 @@ public class DeleteCompiler { // keys for our ranges ScanRanges ranges = context.getScanRanges(); Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); - Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); while (iterator.hasNext()) { - mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter())); + mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER); } return new MutationState(tableRef, mutation, 0, maxSize, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index f172814..b21cc2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -42,7 +42,6 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; @@ -96,7 +95,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class UpsertCompiler { - private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) { + private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) { Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -115,7 +114,7 @@ public class UpsertCompiler { } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); table.newKey(ptr, pkValues); - mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter())); + mutation.put(ptr, columnValues); } private static MutationState upsertSelect(PhoenixStatement statement, @@ -129,7 +128,7 @@ public class UpsertCompiler { boolean isAutoCommit = connection.getAutoCommit(); byte[][] values = new byte[columnIndexes.length][]; int rowCount = 0; - Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize); PTable table = tableRef.getTable(); ResultSet rs = new PhoenixResultSet(iterator, projector, statement); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); @@ -157,7 +156,7 @@ public class UpsertCompiler { column.getMaxLength(), column.getScale(), column.getSortOrder()); values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation); rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { @@ -803,8 +802,8 @@ public class UpsertCompiler { throw new IllegalStateException(); } } - Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1); - setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement); + Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1); + setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation); return new MutationState(tableRef, mutation, 0, maxSize, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java index a9d8311..63bf6a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java @@ -19,32 +19,23 @@ package org.apache.phoenix.execute; import java.sql.SQLException; -import org.apache.phoenix.jdbc.PhoenixConnection; - public class CommitException extends SQLException { - private static final long serialVersionUID = 2L; - private final int[] uncommittedStatementIndexes; + private static final long serialVersionUID = 1L; + private final MutationState uncommittedState; + private final MutationState committedState; - public CommitException(Exception e, int[] uncommittedStatementIndexes) { + public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) { super(e); - this.uncommittedStatementIndexes = uncommittedStatementIndexes; + this.uncommittedState = uncommittedState; + this.committedState = committedState; + } + + public MutationState getUncommittedState() { + return uncommittedState; } - /** - * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned - * correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to - * commit/rollback. - * <p> - * Statements whose index is returned in this set correspond to one or more HBase mutations that have failed. - * <p> - * Statement indexes are maintained correctly for connections that mutate and query - * <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order - * is undefined for connections that execute metadata operations due to the fact that Phoenix rolls - * back connections after metadata mutations. - * - * @see PhoenixConnection#getStatementExecutionCounter() - */ - public int[] getUncommittedStatementIndexes() { - return uncommittedStatementIndexes; + public MutationState getCommittedState() { + return committedState; } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 8053f15..04626a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -19,7 +19,6 @@ package org.apache.phoenix.execute; import java.io.IOException; import java.sql.SQLException; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -62,11 +61,9 @@ import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.sun.istack.NotNull; /** * @@ -81,32 +78,40 @@ public class MutationState implements SQLCloseable { private PhoenixConnection connection; private final long maxSize; private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); - private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; + private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? private long sizeOffset; private int numRows = 0; - MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) { - this.maxSize = maxSize; - this.connection = connection; - this.mutations = mutations; - } - - public MutationState(long maxSize, PhoenixConnection connection) { + public MutationState(int maxSize, PhoenixConnection connection) { this(maxSize,connection,0); } - public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize())); + public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) { + this.maxSize = maxSize; + this.connection = connection; this.sizeOffset = sizeOffset; } - public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this(maxSize, connection, sizeOffset); + public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { + this.maxSize = maxSize; + this.connection = connection; this.mutations.put(table, mutations); + this.sizeOffset = sizeOffset; this.numRows = mutations.size(); throwIfTooBig(); } + private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) { + this.maxSize = maxSize; + this.connection = connection; + this.sizeOffset = sizeOffset; + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) { + numRows += entry.getValue().size(); + this.mutations.put(entry.getKey(), entry.getValue()); + } + throwIfTooBig(); + } + private void throwIfTooBig() { if (numRows > maxSize) { // TODO: throw SQLException ? @@ -129,28 +134,29 @@ public class MutationState implements SQLCloseable { } this.sizeOffset += newMutation.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; - Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue()); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue()); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new - for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { + for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) { // Replace existing row with new row - RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); - if (existingRowMutationState != null) { - Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); + Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); + if (existingValues != null) { if (existingValues != PRow.DELETE_MARKER) { - Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); + Map<PColumn,byte[]> newRow = rowEntry.getValue(); // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. if (newRow != PRow.DELETE_MARKER) { - // Merge existing column values with new column values - existingRowMutationState.join(rowEntry.getValue()); + // Replace existing column values with new column values + for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) { + existingValues.put(valueEntry.getKey(), valueEntry.getValue()); + } // Now that the existing row has been merged with the new row, replace it back - // again (since it was merged with the new one above). - existingRows.put(rowEntry.getKey(), existingRowMutationState); + // again (since it was replaced with the new one above). + existingRows.put(rowEntry.getKey(), existingValues); } } } else { @@ -170,16 +176,16 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) { + private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) { final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : Iterators.<PTable>emptyIterator(); final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; - Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator(); + Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next(); + Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next(); ImmutableBytesPtr key = rowEntry.getKey(); PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key); List<Mutation> rowMutations, rowMutationsPertainingToIndex; @@ -191,7 +197,7 @@ public class MutationState implements SQLCloseable { // delete rows). rowMutationsPertainingToIndex = Collections.emptyList(); } else { - for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { + for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) { row.setValue(valueEntry.getKey(), valueEntry.getValue()); } rowMutations = row.toRowMutations(); @@ -243,14 +249,14 @@ public class MutationState implements SQLCloseable { } public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) { - final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); + final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Iterators.emptyIterator(); } Long scn = connection.getSCN(); final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; return new Iterator<Pair<byte[],List<Mutation>>>() { - private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next(); + private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next(); private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init(); private Iterator<Pair<byte[],List<Mutation>>> init() { @@ -291,7 +297,7 @@ public class MutationState implements SQLCloseable { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) { TableRef tableRef = entry.getKey(); long serverTimeStamp = tableRef.getTimeStamp(); PTable table = tableRef.getTable(); @@ -306,15 +312,12 @@ public class MutationState implements SQLCloseable { // TODO: use bitset? table = result.getTable(); PColumn[] columns = new PColumn[table.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { - RowMutationState valueEntry = rowEntry.getValue(); - if (valueEntry != null) { - Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); - if (colValues != PRow.DELETE_MARKER) { - for (PColumn column : colValues.keySet()) { - columns[column.getPosition()] = column; - } - } + for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) { + Map<PColumn,byte[]> valueEntry = rowEntry.getValue(); + if (valueEntry != PRow.DELETE_MARKER) { + for (PColumn column : valueEntry.keySet()) { + columns[column.getPosition()] = column; + } } } for (PColumn column : columns) { @@ -354,14 +357,15 @@ public class MutationState implements SQLCloseable { int i = 0; byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); long[] serverTimeStamps = validate(); - Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); + Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); + List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size()); // add tracing for this operation TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); Span span = trace.getSpan(); while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); - Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); + Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next(); + Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue(); TableRef tableRef = entry.getKey(); PTable table = tableRef.getTable(); table.getIndexMaintainers(tempPtr, connection); @@ -421,6 +425,7 @@ public class MutationState implements SQLCloseable { child.stop(); shouldRetry = false; if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection)); + committedList.add(entry); } catch (Exception e) { SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); if (inferredE != null) { @@ -441,7 +446,9 @@ public class MutationState implements SQLCloseable { } e = inferredE; } - sqlE = new CommitException(e, getUncommittedSattementIndexes()); + // Throw to client with both what was committed so far and what is left to be committed. + // That way, client can either undo what was done or try again with what was not done. + sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection)); } finally { try { hTable.close(); @@ -481,64 +488,7 @@ public class MutationState implements SQLCloseable { numRows = 0; } - private int[] getUncommittedSattementIndexes() { - int[] result = new int[0]; - for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) { - for (RowMutationState rowMutationState : rowMutations.values()) { - result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes()); - } - } - return result; - } - @Override public void close() throws SQLException { } - - public static int[] joinSortedIntArrays(int[] a, int[] b) { - int[] result = new int[a.length + b.length]; - int i = 0, j = 0, k = 0, current; - while (i < a.length && j < b.length) { - current = a[i] < b[j] ? a[i++] : b[j++]; - for ( ; i < a.length && a[i] == current; i++); - for ( ; j < b.length && b[j] == current; j++); - result[k++] = current; - } - while (i < a.length) { - for (current = a[i++] ; i < a.length && a[i] == current; i++); - result[k++] = current; - } - while (j < b.length) { - for (current = b[j++] ; j < b.length && b[j] == current; j++); - result[k++] = current; - } - return Arrays.copyOf(result, k); - } - - public static class RowMutationState { - private Map<PColumn,byte[]> columnValues; - private int[] statementIndexes; - - public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) { - Preconditions.checkNotNull(columnValues); - - this.columnValues = columnValues; - this.statementIndexes = new int[] {statementIndex}; - } - - Map<PColumn, byte[]> getColumnValues() { - return columnValues; - } - - int[] getStatementIndexes() { - return statementIndexes; - } - - void join(RowMutationState newRow) { - getColumnValues().putAll(newRow.getColumnValues()); - statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); - } - - - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index c9ac94a..de9e323 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.function.FunctionArgumentType; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -122,21 +121,21 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private final Properties info; private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); - private MutationState mutationState; + private final MutationState mutationState; private final int mutateBatchSize; private final Long scn; private boolean isAutoCommit = false; private PMetaData metaData; private final PName tenantId; - private final String datePattern; + private final String datePattern; private final String timePattern; private final String timestampPattern; - private int statementExecutionCounter; + private boolean isClosed = false; private Sampler<?> sampler; private boolean readOnly = false; - private Map<String, String> customTracingAnnotations = emptyMap(); - + private Map<String, String> customTracingAnnotations = emptyMap(); + static { Tracing.addTraceMetricsSource(); } @@ -151,20 +150,17 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; - this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { this(connection.getQueryServices(), connection, scn); this.sampler = connection.sampler; - this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; - this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { @@ -237,7 +233,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd } }); - this.mutationState = newMutationState(maxSize); + this.mutationState = new MutationState(maxSize, this); this.services.addConnection(this); // setup tracing, if its enabled @@ -365,10 +361,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return metaData; } - protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this); - } - public MutationState getMutationState() { return mutationState; } @@ -434,7 +426,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return null; } }, Tracing.withTracing(this, "committing mutations")); - statementExecutionCounter = 0; } @Override @@ -635,7 +626,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd @Override public void rollback() throws SQLException { mutationState.rollback(this); - statementExecutionCounter = 0; } @Override @@ -786,19 +776,4 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public KeyValueBuilder getKeyValueBuilder() { return this.services.getKeyValueBuilder(); } - - /** - * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before - * commit or rollback. 0-based. Used to associate partial save errors with SQL statements - * invoked by users. - * @see CommitException - * @see #incrementStatementExecutionCounter() - */ - public int getStatementExecutionCounter() { - return statementExecutionCounter; - } - - public void incrementStatementExecutionCounter() { - statementExecutionCounter++; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java index a23484c..25be8c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java @@ -54,8 +54,8 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.schema.ExecuteQueryNotApplicableException; import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException; -import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.SQLCloseable; @@ -79,7 +79,8 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar private final String query; - public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, IOException { + public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, + IOException { super(connection); this.statement = parser.nextStatement(new ExecutableNodeFactory()); if (this.statement == null) { throw new EOFException(); } @@ -88,7 +89,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar this.parameters = Arrays.asList(new Object[statement.getBindCount()]); Collections.fill(parameters, BindManager.UNBOUND_PARAMETER); } - + public PhoenixPreparedStatement(PhoenixConnection connection, String query) throws SQLException { super(connection); this.query = query; http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index c6d086a..4ca5bb5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -151,7 +151,6 @@ import com.google.common.collect.Lists; * @since 0.1 */ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement { - private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class); public enum Operation { @@ -244,7 +243,6 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho setLastResultSet(rs); setLastUpdateCount(NO_UPDATE); setLastUpdateOperation(stmt.getOperation()); - connection.incrementStatementExecutionCounter(); return rs; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException @@ -291,7 +289,6 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount()); setLastUpdateCount(lastUpdateCount); setLastUpdateOperation(stmt.getOperation()); - connection.incrementStatementExecutionCounter(); return lastUpdateCount; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java deleted file mode 100644 index 67c3353..0000000 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.execute; - -import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import org.junit.Test; - -public class MutationStateTest { - - @Test - public void testJoinIntArrays() { - // simple case - int[] a = new int[] {1}; - int[] b = new int[] {2}; - int[] result = joinSortedIntArrays(a, b); - - assertEquals(2, result.length); - assertArrayEquals(new int[] {1,2}, result); - - // empty arrays - a = new int[0]; - b = new int[0]; - result = joinSortedIntArrays(a, b); - - assertEquals(0, result.length); - assertArrayEquals(new int[] {}, result); - - // dupes between arrays - a = new int[] {1,2,3}; - b = new int[] {1,2,4}; - result = joinSortedIntArrays(a, b); - - assertEquals(4, result.length); - assertArrayEquals(new int[] {1,2,3,4}, result); - - // dupes within arrays - a = new int[] {1,2,2,3}; - b = new int[] {1,2,4}; - result = joinSortedIntArrays(a, b); - - assertEquals(4, result.length); - assertArrayEquals(new int[] {1,2,3,4}, result); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index b64eff8..9947440 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -655,7 +655,7 @@ public abstract class BaseTest { * Create a {@link PhoenixTestDriver} and register it. * @return an initialized and registered {@link PhoenixTestDriver} */ - public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception { + protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception { PhoenixTestDriver newDriver = new PhoenixTestDriver(props); DriverManager.registerDriver(newDriver); Driver oldDriver = DriverManager.getDriver(url);