This is an automated email from the ASF dual-hosted git repository. pboado pushed a commit to branch 4.x-cdh5.16 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 0e5a2635ea023d72459e63bd6443f3733642482b Author: jaanai <jaa...@apache.org> AuthorDate: Sat Jan 5 13:17:42 2019 +0000 PHOENIX-5055 Split mutations batches probably affects correctness of index data --- .../apache/phoenix/end2end/MutationStateIT.java | 47 +++++++++++++++++++++- .../org/apache/phoenix/end2end/QueryMoreIT.java | 6 +-- .../org/apache/phoenix/execute/MutationState.java | 41 ++++++++++++++----- .../apache/phoenix/execute/MutationStateTest.java | 41 +++++++++++++++++++ 4 files changed, 122 insertions(+), 13 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 36782c1..5a5fb56 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -25,8 +25,14 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.util.Iterator; import java.util.Properties; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -157,5 +163,44 @@ public class MutationStateIT extends ParallelStatsDisabledIT { stmt.execute(); assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize()); } - + + @Test + public void testSplitMutationsIntoSameGroupForSingleRow() throws Exception { + String tableName = "TBL_" + generateUniqueName(); + String indexName = "IDX_" + generateUniqueName(); + Properties props = new Properties(); + props.put("phoenix.mutate.batchSize", "2"); + try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class)) { + conn.setAutoCommit(false); + conn.createStatement().executeUpdate( + "CREATE TABLE " + tableName + " (" + + "A VARCHAR NOT NULL PRIMARY KEY," + + "B VARCHAR," + + "C VARCHAR," + + "D VARCHAR) COLUMN_ENCODED_BYTES = 0"); + conn.createStatement().executeUpdate("CREATE INDEX " + indexName + " on " + tableName + " (C) INCLUDE(D)"); + + conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A2','B2','C2','D2')"); + conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A3','B3', 'C3', null)"); + conn.commit(); + + Table htable = conn.getQueryServices().getTable(Bytes.toBytes(tableName)); + Scan scan = new Scan(); + scan.setRaw(true); + Iterator<Result> scannerIter = htable.getScanner(scan).iterator(); + while (scannerIter.hasNext()) { + long ts = -1; + Result r = scannerIter.next(); + for (Cell cell : r.listCells()) { + if (ts == -1) { + ts = cell.getTimestamp(); + } else { + assertEquals("(" + cell.toString() + ") has different ts", ts, cell.getTimestamp()); + } + } + } + htable.close(); + } + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index 2b1d31e..7c45f1a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -493,14 +493,14 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { connection.commit(); assertEquals(2L, connection.getMutationState().getBatchCount()); - // set the batch size (rows) to 1 - connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1"); + // set the batch size (rows) to 2 since three are at least 2 mutations when updates a single row + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "2"); connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128"); connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); upsertRows(connection, fullTableName); connection.commit(); // each row should be in its own batch - assertEquals(4L, connection.getMutationState().getBatchCount()); + assertEquals(2L, connection.getMutationState().getBatchCount()); } private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 14f13b3..e33a005 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -100,6 +100,7 @@ import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -1092,34 +1093,56 @@ public class MutationState implements SQLCloseable { } /** - * Split the list of mutations into multiple lists that don't exceed row and byte thresholds + * + * Split the list of mutations into multiple lists. since a single row update can contain multiple mutations, + * we only check if the current batch has exceeded the row or size limit for different rows, + * so that mutations for a single row don't end up in different batches. * * @param allMutationList * List of HBase mutations * @return List of lists of mutations */ - public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, - List<Mutation> allMutationList) { + public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, List<Mutation> allMutationList) { + Preconditions.checkArgument(batchSize> 1, + "Mutation types are put or delete, for one row all mutations must be in one batch."); + Preconditions.checkArgument(batchSizeBytes > 0, "Batch size must be larger than 0"); List<List<Mutation>> mutationBatchList = Lists.newArrayList(); List<Mutation> currentList = Lists.newArrayList(); + List<Mutation> sameRowList = Lists.newArrayList(); long currentBatchSizeBytes = 0L; - for (Mutation mutation : allMutationList) { - long mutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation); - if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) { + for (int i = 0; i < allMutationList.size(); ) { + long sameRowBatchSize = 1L; + Mutation mutation = allMutationList.get(i); + long sameRowMutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation); + sameRowList.add(mutation); + while (i + 1 < allMutationList.size() && + Bytes.compareTo(allMutationList.get(i + 1).getRow(), mutation.getRow()) == 0) { + Mutation sameRowMutation = allMutationList.get(i + 1); + sameRowList.add(sameRowMutation); + sameRowMutationSizeBytes += KeyValueUtil.calculateMutationDiskSize(sameRowMutation); + sameRowBatchSize++; + i++; + } + + if (currentList.size() + sameRowBatchSize > batchSize || + currentBatchSizeBytes + sameRowMutationSizeBytes > batchSizeBytes) { if (currentList.size() > 0) { mutationBatchList.add(currentList); currentList = Lists.newArrayList(); currentBatchSizeBytes = 0L; } } - currentList.add(mutation); - currentBatchSizeBytes += mutationSizeBytes; + + currentList.addAll(sameRowList); + currentBatchSizeBytes += sameRowMutationSizeBytes; + sameRowList.clear(); + i++; } + if (currentList.size() > 0) { mutationBatchList.add(currentList); } return mutationBatchList; - } public byte[] encodeTransaction() throws SQLException { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java index 8553b73..22662b2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java @@ -29,6 +29,9 @@ import java.util.List; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.schema.types.PUnsignedInt; @@ -36,6 +39,8 @@ import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.PhoenixRuntime; import org.junit.Test; +import com.google.common.collect.ImmutableList; + public class MutationStateTest { @Test @@ -134,4 +139,40 @@ public class MutationStateTest { assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(1))))); } + + @Test + public void testGetMutationBatchList() { + byte[] r1 = Bytes.toBytes(1); + byte[] r2 = Bytes.toBytes(2); + byte[] r3 = Bytes.toBytes(3); + byte[] r4 = Bytes.toBytes(4); + // one put and one delete as a group + { + List<Mutation> list = ImmutableList.of(new Put(r1), new Put(r2), new Delete(r2)); + List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list); + assertTrue(batchLists.size() == 2); + assertEquals(batchLists.get(0).size(), 1); + assertEquals(batchLists.get(1).size(), 2); + } + + { + List<Mutation> list = ImmutableList.of(new Put(r1), new Delete(r1), new Put(r2)); + List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list); + assertTrue(batchLists.size() == 2); + assertEquals(batchLists.get(0).size(), 2); + assertEquals(batchLists.get(1).size(), 1); + } + + { + List<Mutation> list = ImmutableList.of(new Put(r3), new Put(r1), new Delete(r1), new Put(r2), new Put(r4), new Delete(r4)); + List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list); + assertTrue(batchLists.size() == 4); + assertEquals(batchLists.get(0).size(), 1); + assertEquals(batchLists.get(1).size(), 2); + assertEquals(batchLists.get(2).size(), 1); + assertEquals(batchLists.get(3).size(), 2); + } + + } + }