This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 6e5449c0f9ddc5aa7ce865bcb82a2c97dd0fe79f Author: chenglei <cheng...@apache.org> AuthorDate: Thu Jul 15 17:02:22 2021 +0800 PHOENIX-6507 DistinctAggregatingResultIterator should keep original tuple order of the AggregatingResultIterator --- .../org/apache/phoenix/end2end/AggregateIT.java | 71 ++++++ .../expression/KeyValueColumnExpression.java | 7 +- .../iterate/DistinctAggregatingResultIterator.java | 135 +++++----- .../DistinctAggregatingResultIteratorTest.java | 273 +++++++++++++++++++++ 4 files changed, 406 insertions(+), 80 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java index a1b5426..fbb16d0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateIT.java @@ -333,5 +333,76 @@ public class AggregateIT extends BaseAggregateIT { } } + @Test + public void testDistinctAggregatingResultIteratorBug6507() throws Exception { + doTestDistinctAggregatingResultIteratorBug6507(false, false); + doTestDistinctAggregatingResultIteratorBug6507(false, true); + doTestDistinctAggregatingResultIteratorBug6507(true, false); + doTestDistinctAggregatingResultIteratorBug6507(true, true); + } + + private void doTestDistinctAggregatingResultIteratorBug6507(boolean desc ,boolean salted) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = null; + try { + conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String sql = "create table " + tableName + "( "+ + " pk1 varchar not null , " + + " pk2 varchar not null, " + + " pk3 varchar not null," + + " v1 varchar, " + + " v2 varchar, " + + " CONSTRAINT TEST_PK PRIMARY KEY ( "+ + "pk1 "+(desc ? "desc" : "")+", "+ + "pk2 "+(desc ? "desc" : "")+", "+ + "pk3 "+(desc ? "desc" : "")+ + " )) "+(salted ? "SALT_BUCKETS =4" : "split on('b')"); + conn.createStatement().execute(sql); + + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a11','a12','a13','a14','a15')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a21','a22','a23','a24','a25')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','a33','a38','a35')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b11','b12','b13','b14','b15')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b21','b22','b23','b24','b25')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b31','b32','b33','b34','b35')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','c12','c13','a34','a35')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','c13','a34','a35')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','d13','a35','a35')"); + conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('d31','a32','c13','a35','a35')"); + conn.commit(); + + sql = "select distinct pk1,max(v1) from "+tableName+" group by pk1,pk2,pk3 order by pk1,pk2,pk3"; + + ResultSet rs = conn.prepareStatement(sql).executeQuery(); + assertResultSet(rs, new Object[][]{ + {"a11","a14"}, + {"a21","a24"}, + {"a31","a38"}, + {"a31","a34"}, + {"a31","a35"}, + {"b11","b14"}, + {"b21","b24"}, + {"b31","b34"}, + {"d31","a35"}}); + + sql = "select distinct pk2,max(v1) from "+tableName+" group by pk2,pk3 order by pk2,pk3"; + + rs = conn.prepareStatement(sql).executeQuery(); + assertResultSet(rs, new Object[][]{ + {"a12","a14"}, + {"a22","a24"}, + {"a32","a38"}, + {"a32","a35"}, + {"b12","b14"}, + {"b22","b24"}, + {"b32","b34"}, + {"c12","a34"}}); + } finally { + if(conn != null) { + conn.close(); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java index f8432c5..62665e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java @@ -45,7 +45,12 @@ public class KeyValueColumnExpression extends ColumnExpression { public KeyValueColumnExpression() { } - + + public KeyValueColumnExpression(final byte[] cf, final byte[] cq) { + this.cf = cf; + this.cq = cq; + } + public KeyValueColumnExpression(PColumn column) { super(column); this.cf = column.getFamilyName().getBytes(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java index a412582..59cd333 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java @@ -18,18 +18,17 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; -import java.util.Collections; -import java.util.Iterator; +import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ExplainPlanAttributes .ExplainPlanAttributesBuilder; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; @@ -43,27 +42,44 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; * @since 1.2 */ public class DistinctAggregatingResultIterator implements AggregatingResultIterator { - private final AggregatingResultIterator delegate; + /** + * Original AggregatingResultIterator + */ + private final AggregatingResultIterator targetAggregatingResultIterator; private final RowProjector rowProjector; - private Iterator<ResultEntry> resultIterator; - private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable(); - private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable(); + /** + * Cached tuples already seen. + */ + private final Set<ResultEntry> resultEntries = + Sets.<ResultEntry>newHashSet(); private class ResultEntry { + /** + * cached hashCode. + */ private final int hashCode; private final Tuple result; + /** + * cached column values. + */ + private final ImmutableBytesPtr[] columnValues; ResultEntry(Tuple result) { - final int prime = 31; this.result = result; - int hashCode = 0; - for (ColumnProjector column : rowProjector.getColumnProjectors()) { - Expression e = column.getExpression(); - if (e.evaluate(this.result, ptr1)) { - hashCode = prime * hashCode + ptr1.hashCode(); + this.columnValues = + new ImmutableBytesPtr[rowProjector.getColumnCount()]; + int columnIndex = 0; + for (ColumnProjector columnProjector : rowProjector.getColumnProjectors()) { + Expression expression = columnProjector.getExpression(); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + if (!expression.evaluate(this.result, ptr)) { + columnValues[columnIndex] = null; + } else { + columnValues[columnIndex] = ptr; } + columnIndex++; } - this.hashCode = hashCode; + this.hashCode = Arrays.hashCode(columnValues); } @Override @@ -78,91 +94,53 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera return false; } ResultEntry that = (ResultEntry) o; - for (ColumnProjector column : rowProjector.getColumnProjectors()) { - Expression e = column.getExpression(); - boolean isNull1 = !e.evaluate(this.result, ptr1); - boolean isNull2 = !e.evaluate(that.result, ptr2); - if (isNull1 && isNull2) { - return true; - } - if (isNull1 || isNull2) { - return false; - } - if (ptr1.compareTo(ptr2) != 0) { - return false; - } - } - return true; + return Arrays.equals(this.columnValues, that.columnValues); } - + @Override public int hashCode() { return hashCode; } - - Tuple getResult() { - return result; - } } - - protected ResultIterator getDelegate() { - return delegate; - } - + public DistinctAggregatingResultIterator(AggregatingResultIterator delegate, RowProjector rowProjector) { - this.delegate = delegate; + this.targetAggregatingResultIterator = delegate; this.rowProjector = rowProjector; } @Override public Tuple next() throws SQLException { - Iterator<ResultEntry> iterator = getResultIterator(); - if (iterator.hasNext()) { - ResultEntry entry = iterator.next(); - Tuple tuple = entry.getResult(); - aggregate(tuple); - return tuple; - } - resultIterator = Collections.emptyIterator(); - return null; - } - - private Iterator<ResultEntry> getResultIterator() throws SQLException { - if (resultIterator != null) { - return resultIterator; - } - - Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size? - try { - for (Tuple result = delegate.next(); result != null; result = delegate.next()) { - ResultEntry entry = new ResultEntry(result); - entries.add(entry); + while (true) { + Tuple nextTuple = this.targetAggregatingResultIterator.next(); + if (nextTuple == null) { + return null; + } + ResultEntry resultEntry = new ResultEntry(nextTuple); + if (!this.resultEntries.contains(resultEntry)) { + this.resultEntries.add(resultEntry); + return nextTuple; } - } finally { - delegate.close(); } - - resultIterator = entries.iterator(); - return resultIterator; } @Override - public void close() { - resultIterator = Collections.emptyIterator(); - } - + public void close() throws SQLException { + this.targetAggregatingResultIterator.close(); + } @Override public void explain(List<String> planSteps) { - delegate.explain(planSteps); + targetAggregatingResultIterator.explain(planSteps); planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString()); } @Override public void explain(List<String> planSteps, ExplainPlanAttributesBuilder explainPlanAttributesBuilder) { - delegate.explain(planSteps, explainPlanAttributesBuilder); + targetAggregatingResultIterator.explain( + planSteps, + explainPlanAttributesBuilder); explainPlanAttributesBuilder.setClientDistinctFilter( rowProjector.toString()); planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString()); @@ -170,13 +148,12 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera @Override public Aggregator[] aggregate(Tuple result) { - return delegate.aggregate(result); + return targetAggregatingResultIterator.aggregate(result); } - @Override - public String toString() { - return "DistinctAggregatingResultIterator [delegate=" + delegate - + ", rowProjector=" + rowProjector + ", resultIterator=" - + resultIterator + ", ptr1=" + ptr1 + ", ptr2=" + ptr2 + "]"; - } + @Override + public String toString() { + return "DistinctAggregatingResultIterator [targetAggregatingResultIterator=" + targetAggregatingResultIterator + + ", rowProjector=" + rowProjector; + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java new file mode 100644 index 0000000..65b7382 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/DistinctAggregatingResultIteratorTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.AssertResults; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class DistinctAggregatingResultIteratorTest { + private final static byte[] cf = Bytes.toBytes("cf"); + private final static byte[] cq1 = Bytes.toBytes("cq1"); + private final static byte[] cq2 = Bytes.toBytes("cq2"); + private final static byte[] cq3 = Bytes.toBytes("cq3"); + private final static byte[] rowKey1 = Bytes.toBytes("rowKey1"); + private final static byte[] rowKey2 = Bytes.toBytes("rowKey2"); + private final static byte[] rowKey3 = Bytes.toBytes("rowKey3"); + private final static byte[] rowKey4 = Bytes.toBytes("rowKey4"); + private final static byte[] rowKey5 = Bytes.toBytes("rowKey4"); + + @Test + public void testDistinctAggregatingResultIterator() throws Throwable { + //Test with duplicate + Tuple[] input1 = new Tuple[] { + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)), + new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)), + new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)), + new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))), + null + + }; + + Tuple[] result1 = new Tuple[] { + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)), + new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)), + new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)), + new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))) + + }; + RowProjector mockRowProjector = Mockito.mock(RowProjector.class); + Mockito.when(mockRowProjector.getColumnCount()).thenReturn(2); + + KeyValueColumnExpression columnExpression1 = new KeyValueColumnExpression(cf, cq1); + KeyValueColumnExpression columnExpression2 = new KeyValueColumnExpression(cf, cq2); + final ColumnProjector mockColumnProjector1 = Mockito.mock(ColumnProjector.class); + Mockito.when(mockColumnProjector1.getExpression()).thenReturn(columnExpression1); + final ColumnProjector mockColumnProjector2 = Mockito.mock(ColumnProjector.class); + Mockito.when(mockColumnProjector2.getExpression()).thenReturn(columnExpression2); + + Mockito.when(mockRowProjector.getColumnProjectors()).thenAnswer( + new Answer<List<ColumnProjector> >() { + @Override + public List<ColumnProjector> answer(InvocationOnMock invocation) throws Throwable { + return Arrays.asList(mockColumnProjector1,mockColumnProjector2); + } + }); + + assertResults( + input1, result1, mockRowProjector); + + //Test with duplicate and null + Tuple[] input2 = new Tuple[] { + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)), + new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)), + new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)), + new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))), + + null + + }; + + Tuple[] result2 = new Tuple[] { + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq2, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey3, cf, cq1, PInteger.INSTANCE.toBytes(4)), + new KeyValue(rowKey3, cf, cq2, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey1, cf, cq2, PInteger.INSTANCE.toBytes(1)), + new KeyValue(rowKey1, cf, cq1, PInteger.INSTANCE.toBytes(2)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey4, cf, cq1, PInteger.INSTANCE.toBytes(7)), + new KeyValue(rowKey4, cf, cq2, PInteger.INSTANCE.toBytes(8)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey2, cf, cq1, PInteger.INSTANCE.toBytes(11)), + new KeyValue(rowKey2, cf, cq3, PInteger.INSTANCE.toBytes(12)))), + + new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey5, cf, cq1, PInteger.INSTANCE.toBytes(90)), + new KeyValue(rowKey5, cf, cq2, PInteger.INSTANCE.toBytes(100)))) + + }; + assertResults( + input2, result2, mockRowProjector); + + //Test with no duplicate + int n = 100; + Tuple[] input3 = new Tuple[n + 1]; + for(int i = 0; i <= n; i++) { + byte[] rowKey = PInteger.INSTANCE.toBytes(i); + input3[i] = new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey, cf, cq1, PInteger.INSTANCE.toBytes(i + 1)), + new KeyValue(rowKey, cf, cq2, PInteger.INSTANCE.toBytes(i + 2)))); + } + input3[n] = null; + Tuple[] result3 = Arrays.copyOfRange(input3, 0, n); + assertResults( + input3, result3, mockRowProjector); + + //Test with all duplicate + Tuple[] input4 = new Tuple[n + 1]; + for(int i = 0; i <= n; i++) { + byte[] rowKey = PInteger.INSTANCE.toBytes(1); + input4[i] = new MultiKeyValueTuple( + Arrays.<Cell> asList( + new KeyValue(rowKey, cf, cq1, PInteger.INSTANCE.toBytes(2)), + new KeyValue(rowKey, cf, cq2, PInteger.INSTANCE.toBytes(3)))); + } + input4[n] = null; + Tuple[] result4 = new Tuple[] {input4[0]}; + assertResults( + input4, result4, mockRowProjector); + + } + + private void assertResults(Tuple[] input, Tuple[] result, RowProjector rowProjector) throws Exception { + AggregatingResultIterator mockAggregatingResultIterator = + Mockito.mock(AggregatingResultIterator.class); + Mockito.when(mockAggregatingResultIterator.next()).thenReturn( + input[0], Arrays.copyOfRange(input, 1, input.length)); + + DistinctAggregatingResultIterator distinctAggregatingResultIterator = + new DistinctAggregatingResultIterator(mockAggregatingResultIterator, rowProjector); + AssertResults.assertResults( + distinctAggregatingResultIterator, result); + } + +}