This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-fluo.git
The following commit(s) were added to refs/heads/master by this push: new 2f11089 fixes #867 - Stream support for Scanners (#884) 2f11089 is described below commit 2f11089270f02c65404a7d0c9f7242254d992937 Author: Christopher McTague <cjmcta...@apache.org> AuthorDate: Tue Jul 25 08:48:51 2017 -0400 fixes #867 - Stream support for Scanners (#884) --- .../fluo/api/client/scanner/CellScanner.java | 10 + .../fluo/api/client/scanner/ColumnScanner.java | 12 ++ .../apache/fluo/api/client/scanner/RowScanner.java | 10 + .../fluo/api/client/scanner/ScannerStreamTest.java | 238 +++++++++++++++++++++ 4 files changed, 270 insertions(+) diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java index b2bf50e..90e631e 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java @@ -15,6 +15,9 @@ package org.apache.fluo.api.client.scanner; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import org.apache.fluo.api.data.RowColumnValue; /** @@ -22,4 +25,11 @@ import org.apache.fluo.api.data.RowColumnValue; */ public interface CellScanner extends Iterable<RowColumnValue> { + /** + * @since 1.2.0 + */ + default Stream<RowColumnValue> stream() { + return StreamSupport.stream(spliterator(), false); + } + } diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java index dd28ce9..6a61a88 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java @@ -15,6 +15,9 @@ package org.apache.fluo.api.client.scanner; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.ColumnValue; @@ -24,12 +27,21 @@ import org.apache.fluo.api.data.ColumnValue; public interface ColumnScanner extends Iterable<ColumnValue> { /** + * @since 1.0.0 * @return the row for all column values */ Bytes getRow(); /** + * @since 1.0.0 * @return the row for all column values decoded as UTF-8 */ String getsRow(); + + /** + * @since 1.2.0 + */ + default Stream<ColumnValue> stream() { + return StreamSupport.stream(spliterator(), false); + } } diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java index e65816b..3d679f8 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java @@ -15,9 +15,19 @@ package org.apache.fluo.api.client.scanner; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + /** * @since 1.0.0 */ public interface RowScanner extends Iterable<ColumnScanner> { + /** + * @since 1.2.0 + */ + default Stream<ColumnScanner> stream() { + return StreamSupport.stream(spliterator(), false); + } + } diff --git a/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java b/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java new file mode 100644 index 0000000..d885552 --- /dev/null +++ b/modules/api/src/test/java/org/apache/fluo/api/client/scanner/ScannerStreamTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.fluo.api.client.scanner; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumnValue; +import org.junit.Assert; +import org.junit.Test; + +/** + * @since 1.2.0 + */ +public class ScannerStreamTest { + + @Test + public void testCellScannerStream() { + Set<RowColumnValue> rowCols = new HashSet<>(); + Set<RowColumnValue> empty = new HashSet<>(); + + rowCols.add(new RowColumnValue("r1", new Column("f1", "q1"), "v1")); + rowCols.add(new RowColumnValue("r1", new Column("f2", "q3"), "v2")); + rowCols.add(new RowColumnValue("r2", new Column("f1", "q1"), "v3")); + rowCols.add(new RowColumnValue("r2", new Column("f1", "q2"), "v4")); + rowCols.add(new RowColumnValue("r4", new Column("f2", "q5"), "v5")); + + CellScannerImpl cellScanner = new CellScannerImpl(rowCols); + + Set<RowColumnValue> expected = + rowCols.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f2")) + .collect(Collectors.toSet()); + Set<RowColumnValue> actualSubSet = + cellScanner.stream().filter(rcv -> rcv.getColumn().getsFamily().equals("f2")) + .collect(Collectors.toSet()); + + Assert.assertNotEquals(empty, actualSubSet); + Assert.assertNotEquals(empty, cellScanner.stream().collect(Collectors.toSet())); + Assert.assertEquals(rowCols, cellScanner.stream().collect(Collectors.toSet())); + Assert.assertEquals(expected, actualSubSet); + } + + @Test + public void testColumnScannerStream() { + Set<ColumnValue> colsVal = new HashSet<>(); + Set<ColumnValue> empty = new HashSet<>(); + + Bytes row = Bytes.of("123"); + colsVal.add(new ColumnValue(new Column("f1", "q1"), Bytes.of("v1"))); + colsVal.add(new ColumnValue(new Column("f2", "q3"), Bytes.of("v2"))); + colsVal.add(new ColumnValue(new Column("f1", "q1"), Bytes.of("v3"))); + colsVal.add(new ColumnValue(new Column("f1", "q2"), Bytes.of("v4"))); + colsVal.add(new ColumnValue(new Column("f2", "q5"), Bytes.of("v5"))); + + ColumnScanner colScanner = new ColumnScannerImpl(row, colsVal); + + Set<ColumnValue> expected = + colsVal.stream().filter(cv -> cv.getColumn().getsFamily().equals("f2")) + .collect(Collectors.toSet()); + Set<ColumnValue> colSubSet = + colScanner.stream().filter(cv -> cv.getColumn().getsFamily().equals("f2")) + .collect(Collectors.toSet()); + + Assert.assertNotEquals(empty, colSubSet); + Assert.assertNotEquals(empty, colScanner.stream().collect(Collectors.toSet())); + Assert.assertEquals(colsVal, colScanner.stream().collect(Collectors.toSet())); + Assert.assertEquals(expected, colSubSet); + } + + @Test + public void testRowScannerStream() { + List<ColumnScanner> rows = new ArrayList<>(); + Set<ColumnValue> cv1 = new HashSet<>(); + Set<ColumnValue> cv2 = new HashSet<>(); + + Bytes row1 = Bytes.of("555555555"); + cv1.add(new ColumnValue(new Column("firstname"), Bytes.of("Chris"))); + cv1.add(new ColumnValue(new Column("lastname"), Bytes.of("McTague"))); + cv1.add(new ColumnValue(new Column("age"), Bytes.of("21"))); + + rows.add(new ColumnScannerImpl(row1, cv1)); + + Bytes row2 = Bytes.of("55234234"); + cv2.add(new ColumnValue(new Column("firstname"), Bytes.of("Hulk"))); + cv2.add(new ColumnValue(new Column("lastname"), Bytes.of("Hogan"))); + cv2.add(new ColumnValue(new Column("age"), Bytes.of("60"))); + + rows.add(new ColumnScannerImpl(row2, cv2)); + + RowScannerImpl rsi = new RowScannerImpl(rows); + + Set<Person> people = rsi.stream().map(cs -> toPerson(cs)).collect(Collectors.toSet()); + + Set<Person> expected = new HashSet<>(); + expected.add(new Person("Chris", "McTague", 21, 555555555)); + expected.add(new Person("Hulk", "Hogan", 60, 55234234)); + + Assert.assertEquals(expected, people); + } + + private static Person toPerson(ColumnScanner cs) { + Person p = new Person(); + p.id = Integer.parseInt(cs.getsRow()); + + for (ColumnValue cv : cs) { + switch (cv.getColumn().getsFamily()) { + case "firstname": { + p.firstname = cv.getsValue(); + break; + } + case "lastname": { + p.lastname = cv.getsValue(); + break; + } + case "age": { + p.age = Integer.parseInt(cv.getsValue()); + break; + } + default: { + throw new IllegalArgumentException("unknown column " + cv.getColumn()); + } + } + } + return p; + } + + private static class CellScannerImpl implements CellScanner { + private Collection<RowColumnValue> rcv; + + CellScannerImpl(Collection<RowColumnValue> rcv) { + this.rcv = rcv; + } + + @Override + public Iterator<RowColumnValue> iterator() { + return rcv.iterator(); + } + } + + private static class RowScannerImpl implements RowScanner { + private Collection<ColumnScanner> scan; + + RowScannerImpl(Collection<ColumnScanner> scan) { + this.scan = scan; + } + + @Override + public Iterator<ColumnScanner> iterator() { + return scan.iterator(); + } + } + + private static class ColumnScannerImpl implements ColumnScanner { + private Collection<ColumnValue> scan; + private Bytes row; + + ColumnScannerImpl(Bytes row, Collection<ColumnValue> scan) { + this.row = row; + this.scan = scan; + } + + @Override + public Iterator<ColumnValue> iterator() { + return scan.iterator(); + } + + @Override + public Bytes getRow() { + return row; + } + + @Override + public String getsRow() { + return row.toString(); + } + } + + private static class Person { + private String firstname; + private String lastname; + private int age; + private int id; + + Person() { + this.firstname = ""; + this.lastname = ""; + this.age = 0; + this.id = 0; + } + + Person(String f, String l, int a, int id) { + this.firstname = f; + this.lastname = l; + this.age = a; + this.id = id; + } + + @Override + public int hashCode() { + return Objects.hash(firstname, lastname, age, id); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + + if (o instanceof Person) { + Person p = (Person) o; + return (this.firstname.equals(p.firstname)) && (this.lastname.equals(p.lastname)) + && (this.age == p.age) && (this.id == p.id); + } + return false; + } + } +} -- To stop receiving notification emails like this one, please contact ['"comm...@fluo.apache.org" <comm...@fluo.apache.org>'].