PHOENIX-3208 MutationState.toMutations method would throw a exception if multiple tables are upserted (chenglei)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ecb9360f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ecb9360f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ecb9360f Branch: refs/heads/encodecolumns2 Commit: ecb9360f61efba077a70880d472602e3768b0935 Parents: 83b0ebe Author: James Taylor <jamestay...@apache.org> Authored: Wed Nov 2 09:59:06 2016 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Wed Nov 2 13:23:28 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 3 +- .../phoenix/execute/MutationStateTest.java | 75 ++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ecb9360f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 9d1344b..cb66968 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -706,7 +706,7 @@ public class MutationState implements SQLCloseable { } @Override - public Pair<byte[], List<Mutation>> next() { + public Pair<byte[], List<Mutation>> next() { Pair<PName, List<Mutation>> pair = mutationIterator.next(); return new Pair<byte[], List<Mutation>>(pair.getFirst().getBytes(), pair.getSecond()); } @@ -727,6 +727,7 @@ public class MutationState implements SQLCloseable { public Pair<byte[], List<Mutation>> next() { if (!innerIterator.hasNext()) { current = iterator.next(); + innerIterator=init(); } return innerIterator.next(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ecb9360f/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java index 4c596ad..276d946 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java @@ -20,7 +20,20 @@ package org.apache.phoenix.execute; import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.schema.types.PUnsignedInt; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.PhoenixRuntime; import org.junit.Test; public class MutationStateTest { @@ -59,4 +72,66 @@ public class MutationStateTest { assertEquals(4, result.length); assertArrayEquals(new int[] {1,2,3,4}, result); } + + private static String getUrl() { + return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + PhoenixRuntime.CONNECTIONLESS; + } + + @Test + public void testToMutationsOverMultipleTables() throws Exception { + Connection conn = null; + try { + conn=DriverManager.getConnection(getUrl()); + conn.createStatement().execute( + "create table MUTATION_TEST1"+ + "( id1 UNSIGNED_INT not null primary key,"+ + "appId1 VARCHAR)"); + conn.createStatement().execute( + "create table MUTATION_TEST2"+ + "( id2 UNSIGNED_INT not null primary key,"+ + "appId2 VARCHAR)"); + + conn.createStatement().execute("upsert into MUTATION_TEST1(id1,appId1) values(111,'app1')"); + conn.createStatement().execute("upsert into MUTATION_TEST2(id2,appId2) values(222,'app2')"); + + + Iterator<Pair<byte[],List<KeyValue>>> dataTableNameAndMutationKeyValuesIter = + PhoenixRuntime.getUncommittedDataIterator(conn); + + + assertTrue(dataTableNameAndMutationKeyValuesIter.hasNext()); + Pair<byte[],List<KeyValue>> pair=dataTableNameAndMutationKeyValuesIter.next(); + String tableName1=Bytes.toString(pair.getFirst()); + List<KeyValue> keyValues1=pair.getSecond(); + + assertTrue(dataTableNameAndMutationKeyValuesIter.hasNext()); + pair=dataTableNameAndMutationKeyValuesIter.next(); + String tableName2=Bytes.toString(pair.getFirst()); + List<KeyValue> keyValues2=pair.getSecond(); + + if("MUTATION_TEST1".equals(tableName1)) { + assertTable(tableName1, keyValues1, tableName2, keyValues2); + } + else { + assertTable(tableName2, keyValues2, tableName1, keyValues1); + } + assertTrue(!dataTableNameAndMutationKeyValuesIter.hasNext()); + } + finally { + if(conn!=null) { + conn.close(); + } + } + } + + private void assertTable(String tableName1,List<KeyValue> keyValues1,String tableName2,List<KeyValue> keyValues2) { + assertTrue("MUTATION_TEST1".equals(tableName1)); + assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(111),CellUtil.cloneRow(keyValues1.get(0)))); + assertTrue("app1".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues1.get(0))))); + + assertTrue("MUTATION_TEST2".equals(tableName2)); + assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(222),CellUtil.cloneRow(keyValues2.get(0)))); + assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(0))))); + + } }