fixes #639 vastly improved the scanner API
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/23374784 Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/23374784 Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/23374784 Branch: refs/heads/master Commit: 233747847831efc4dad6cc030a895860ead7d71e Parents: e72a931 Author: Keith Turner <ktur...@apache.org> Authored: Wed Jul 13 18:09:15 2016 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Jul 15 11:27:13 2016 -0400 ---------------------------------------------------------------------- .../apache/fluo/api/client/SnapshotBase.java | 43 +++- .../fluo/api/client/scanner/CellScanner.java | 25 ++ .../fluo/api/client/scanner/ColumnScanner.java | 30 +++ .../fluo/api/client/scanner/RowScanner.java | 23 ++ .../api/client/scanner/RowScannerBuilder.java | 26 ++ .../fluo/api/client/scanner/ScannerBuilder.java | 61 +++++ .../fluo/api/config/ScannerConfiguration.java | 98 -------- .../org/apache/fluo/api/data/ColumnValue.java | 76 ++++++ .../fluo/api/iterator/ColumnIterator.java | 31 --- .../apache/fluo/api/iterator/RowIterator.java | 30 --- .../apache/fluo/cluster/runner/AppRunner.java | 107 ++++---- .../apache/fluo/cluster/runner/ScanTest.java | 8 +- .../fluo/core/impl/ColumnIteratorImpl.java | 90 ------- .../apache/fluo/core/impl/RowIteratorImpl.java | 77 ------ .../apache/fluo/core/impl/SnapshotScanner.java | 244 +++++++++++-------- .../apache/fluo/core/impl/TransactionImpl.java | 61 +++-- .../fluo/core/impl/scanner/CellScannerImpl.java | 57 +++++ .../core/impl/scanner/ColumnScannerImpl.java | 71 ++++++ .../fluo/core/impl/scanner/RowScannerImpl.java | 50 ++++ .../core/impl/scanner/ScannerBuilderImpl.java | 90 +++++++ .../fluo/core/log/TracingTransaction.java | 9 +- .../core/config/ScannerConfigurationTest.java | 92 ------- .../org/apache/fluo/integration/ITBase.java | 21 +- .../org/apache/fluo/integration/ITBaseImpl.java | 1 + .../fluo/integration/TestTransaction.java | 7 +- .../apache/fluo/integration/impl/FluoIT.java | 31 +-- .../apache/fluo/integration/impl/ScannerIT.java | 217 +++++++++++++++++ .../fluo/integration/impl/StochasticBankIT.java | 24 +- .../integration/impl/WeakNotificationIT.java | 21 +- .../apache/fluo/integration/impl/WorkerIT.java | 17 +- .../fluo/mapreduce/FluoEntryInputFormat.java | 45 ++-- .../fluo/mapreduce/FluoRowInputFormat.java | 37 +-- pom.xml | 4 +- 33 files changed, 1082 insertions(+), 742 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java index 778aa94..79e1cef 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java @@ -19,11 +19,11 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.fluo.api.config.ScannerConfiguration; +import org.apache.fluo.api.client.scanner.ScannerBuilder; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; -import org.apache.fluo.api.iterator.RowIterator; +import org.apache.fluo.api.data.Span; /** * Allows users to read from a Fluo table at a certain point in time @@ -57,9 +57,44 @@ public interface SnapshotBase { Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns); /** - * Retrieves a {@link RowIterator} with the given {@link ScannerConfiguration} + * This method is the starting point for constructing a scanner. Scanners can be constructed over + * a {@link Span} and/or with a subset of columns. Below is simple example of building a scanner. + * + * <pre> + * {@code + * Transaction tx = ...; + * Span span = Span.exact("row4"); + * Column col1 = new Column("fam1","qual1"); + * Column col2 = new Column("fam1","qual2"); + * + * //create a scanner over row4 fetching columns fam1:qual1 and fam1:qual2 + * CellScanner cs = tx.scanner().over(span).fetch(col1,col2).build(); + * for(RowColumnValue rcv : cs) { + * //do stuff with rcv + * } + * } + * </pre> + * + * <p> + * The following example shows how to build a row scanner. + * + * <pre> + * { + * @code + * RowScanner rs = tx.scanner().over(span).fetch(col1, col2).byRow().build(); + * for (ColumnScanner colScanner : rs) { + * Bytes row = colScanner.getRow(); + * for (ColumnValue cv : colScanner) { + * // do stuff with the columns and values in the row + * } + * } + * } + * </pre> + * + * @return A scanner builder. */ - RowIterator get(ScannerConfiguration config); + + ScannerBuilder scanner(); /** * Wrapper for {@link #get(Collection)} that uses Strings. All strings are encoded and decoded http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java new file mode 100644 index 0000000..b2bf50e --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/CellScanner.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.client.scanner; + +import org.apache.fluo.api.data.RowColumnValue; + +/** + * @since 1.0.0 + */ +public interface CellScanner extends Iterable<RowColumnValue> { + +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java new file mode 100644 index 0000000..9e790ea --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ColumnScanner.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.client.scanner; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.ColumnValue; + +/** + * @since 1.0.0 + */ +public interface ColumnScanner extends Iterable<ColumnValue> { + + /** + * @return the row for all column values + */ + Bytes getRow(); +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java new file mode 100644 index 0000000..e65816b --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScanner.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.client.scanner; + +/** + * @since 1.0.0 + */ +public interface RowScanner extends Iterable<ColumnScanner> { + +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java new file mode 100644 index 0000000..fa7c60e --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/RowScannerBuilder.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.client.scanner; + +/** + * @since 1.0.0 + */ +public interface RowScannerBuilder { + /** + * @return a new scanner created with any previously set restrictions + */ + RowScanner build(); +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java new file mode 100644 index 0000000..1217aa0 --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/client/scanner/ScannerBuilder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.client.scanner; + +import java.util.Collection; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +/** + * @since 1.0.0 + */ +public interface ScannerBuilder { + /** + * @param span restrict the scanner to data within span + * @return self + */ + ScannerBuilder over(Span span); + + + /** + * Passing in a Column with only the family set will fetch the entire column family. + * + * @param columns restrict the scanner to only these columns + * @return self + */ + ScannerBuilder fetch(Column... columns); + + /** + * Passing in a Column with only the family set will fetch the entire column family. + * + * @param columns restrict the scanner to only these columns + * @return self + */ + ScannerBuilder fetch(Collection<Column> columns); + + /** + * @return a new scanner created with any previously set restrictions + */ + CellScanner build(); + + /** + * Call this to build a row scanner. + * + * @return a row scanner builder using any previously set restrictions + */ + RowScannerBuilder byRow(); +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java deleted file mode 100644 index 3a73b50..0000000 --- a/modules/api/src/main/java/org/apache/fluo/api/config/ScannerConfiguration.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.api.config; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Objects; -import java.util.Set; - -import org.apache.fluo.api.client.SnapshotBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.Span; - -/** - * Contains configuration for a {@link org.apache.fluo.api.client.Snapshot} scanner. Passed to - * {@link SnapshotBase#get(ScannerConfiguration)}. - * - * @since 1.0.0 - */ -public class ScannerConfiguration implements Cloneable { - - private Span span = new Span(); - private Set<Column> columns = new HashSet<>(); - - /** - * Sets {@link Span} for ScannerConfiguration - */ - public ScannerConfiguration setSpan(Span span) { - Objects.requireNonNull(span); - this.span = span; - return this; - } - - /** - * Retrieves {@link Span} for ScannerConfiguration - */ - public Span getSpan() { - return span; - } - - /** - * List of all {@link Column}s that scanner will retrieve - */ - public Set<Column> getColumns() { - return Collections.unmodifiableSet(columns); - } - - /** - * Configures scanner to retrieve column with the given family - */ - public ScannerConfiguration fetchColumnFamily(Bytes fam) { - Objects.requireNonNull(fam); - columns.add(new Column(fam)); - return this; - } - - /** - * Configures scanner to retrieve column with the given family and qualifier - */ - public ScannerConfiguration fetchColumn(Bytes fam, Bytes qual) { - Objects.requireNonNull(fam); - Objects.requireNonNull(qual); - columns.add(new Column(fam, qual)); - return this; - } - - /** - * Clears all fetched column settings - */ - public void clearColumns() { - columns.clear(); - } - - @Override - @SuppressWarnings("unchecked") - public Object clone() throws CloneNotSupportedException { - ScannerConfiguration sc = (ScannerConfiguration) super.clone(); - - sc.columns = (Set<Column>) ((HashSet<Column>) columns).clone(); - sc.span = span; - - return sc; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java new file mode 100644 index 0000000..23a3741 --- /dev/null +++ b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.api.data; + +import java.io.Serializable; + +/** + * @since 1.0.0 + */ + +public class ColumnValue implements Serializable, Comparable<ColumnValue> { + private static final long serialVersionUID = 1L; + + private Column column; + private Bytes val; + + public ColumnValue(Column col, Bytes val) { + this.column = col; + this.val = val; + } + + public ColumnValue(Column col, String val) { + this.column = col; + this.val = Bytes.of(val); + } + + public Column getColumn() { + return column; + } + + public Bytes getValue() { + return val; + } + + @Override + public int compareTo(ColumnValue o) { + int comp = column.compareTo(o.column); + if (comp == 0) { + comp = val.compareTo(o.val); + } + return comp; + } + + @Override + public boolean equals(Object o) { + if (o instanceof ColumnValue) { + ColumnValue ocv = (ColumnValue) o; + return column.equals(ocv.column) && val.equals(ocv.val); + } + + return false; + } + + @Override + public int hashCode() { + return column.hashCode() + 31 * val.hashCode(); + } + + @Override + public String toString() { + return column + " " + val; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java b/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java deleted file mode 100644 index be04738..0000000 --- a/modules/api/src/main/java/org/apache/fluo/api/iterator/ColumnIterator.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.api.iterator; - -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; - -/** - * Iterator for Fluo columns - * - * @since 1.0.0 - */ -public interface ColumnIterator extends Iterator<Entry<Column, Bytes>> { - -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java b/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java deleted file mode 100644 index 21e53b1..0000000 --- a/modules/api/src/main/java/org/apache/fluo/api/iterator/RowIterator.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.api.iterator; - -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.fluo.api.data.Bytes; - -/** - * Iterator for Fluo rows - * - * @since 1.0.0 - */ -public interface RowIterator extends Iterator<Entry<Bytes, ColumnIterator>> { - -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java ---------------------------------------------------------------------- diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java index ca9afe0..d40ff9e 100644 --- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java +++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java @@ -17,7 +17,8 @@ package org.apache.fluo.cluster.runner; import java.lang.reflect.Method; import java.util.Arrays; -import java.util.Map; +import java.util.Collection; +import java.util.HashSet; import javax.inject.Provider; @@ -35,14 +36,13 @@ import org.apache.fluo.accumulo.format.FluoFormatter; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ScannerConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; import org.apache.fluo.api.exceptions.FluoException; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; import org.apache.fluo.cluster.util.FluoYarnConfig; import org.apache.fluo.core.impl.Environment; import org.apache.fluo.core.impl.Notification; @@ -68,9 +68,8 @@ public abstract class AppRunner { this.scriptName = scriptName; } - public static ScannerConfiguration buildScanConfig(ScanOptions options) { - ScannerConfiguration scanConfig = new ScannerConfiguration(); - + public static Span getSpan(ScanOptions options) { + Span span = new Span(); if ((options.getExactRow() != null) && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options .getRowPrefix() != null))) { @@ -87,35 +86,43 @@ public abstract class AppRunner { // configure span of scanner if (options.getExactRow() != null) { - scanConfig.setSpan(Span.exact(options.getExactRow())); + span = Span.exact(options.getExactRow()); } else if (options.getRowPrefix() != null) { - scanConfig.setSpan(Span.prefix(options.getRowPrefix())); + span = Span.prefix(options.getRowPrefix()); } else { if ((options.getStartRow() != null) && (options.getEndRow() != null)) { - scanConfig.setSpan(new Span(options.getStartRow(), true, options.getEndRow(), true)); + span = new Span(options.getStartRow(), true, options.getEndRow(), true); } else if (options.getStartRow() != null) { - scanConfig.setSpan(new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true)); + span = new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true); } else if (options.getEndRow() != null) { - scanConfig.setSpan(new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true)); + span = new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true); } } + return span; + } + + public static Collection<Column> getColumns(ScanOptions options) { + Collection<Column> columns = new HashSet<>(); + // configure columns of scanner for (String column : options.getColumns()) { String[] colFields = column.split(":"); if (colFields.length == 1) { - scanConfig.fetchColumnFamily(Bytes.of(colFields[0])); + columns.add(new Column(colFields[0])); } else if (colFields.length == 2) { - scanConfig.fetchColumn(Bytes.of(colFields[0]), Bytes.of(colFields[1])); + columns.add(new Column(colFields[0], colFields[1])); } else { throw new IllegalArgumentException("Failed to scan! Column '" + column + "' has too many fields (indicated by ':')"); } } - return scanConfig; + return columns; } + + public long scan(FluoConfiguration config, String[] args) { ScanOptions options = new ScanOptions(); JCommander jcommand = new JCommander(options); @@ -148,46 +155,46 @@ public abstract class AppRunner { try (FluoClient client = FluoFactory.newClient(sConfig)) { try (Snapshot s = client.newSnapshot()) { - ScannerConfiguration scanConfig = null; + Span span = null; + Collection<Column> columns = null; try { - scanConfig = buildScanConfig(options); + span = getSpan(options); + columns = getColumns(options); } catch (IllegalArgumentException e) { System.err.println(e.getMessage()); System.exit(-1); } - RowIterator iter = s.get(scanConfig); - - if (!iter.hasNext()) { - System.out.println("\nNo data found\n"); - } + CellScanner cellScanner = s.scanner().over(span).fetch(columns).build(); StringBuilder sb = new StringBuilder(); - while (iter.hasNext() && !System.out.checkError()) { - Map.Entry<Bytes, ColumnIterator> rowEntry = iter.next(); - ColumnIterator citer = rowEntry.getValue(); - while (citer.hasNext() && !System.out.checkError()) { - Map.Entry<Column, Bytes> colEntry = citer.next(); - if (options.hexEncNonAscii) { - sb.setLength(0); - Hex.encNonAscii(sb, rowEntry.getKey()); - sb.append(" "); - Hex.encNonAscii(sb, colEntry.getKey(), " "); - sb.append("\t"); - Hex.encNonAscii(sb, colEntry.getValue()); - System.out.println(sb.toString()); - } else { - sb.setLength(0); - sb.append(rowEntry.getKey()); - sb.append(" "); - sb.append(colEntry.getKey()); - sb.append("\t"); - sb.append(colEntry.getValue()); - System.out.println(sb.toString()); - } - entriesFound++; + for (RowColumnValue rcv : cellScanner) { + if (options.hexEncNonAscii) { + sb.setLength(0); + Hex.encNonAscii(sb, rcv.getRow()); + sb.append(" "); + Hex.encNonAscii(sb, rcv.getColumn(), " "); + sb.append("\t"); + Hex.encNonAscii(sb, rcv.getValue()); + System.out.println(sb.toString()); + } else { + sb.setLength(0); + sb.append(rcv.getsRow()); + sb.append(" "); + sb.append(rcv.getColumn()); + sb.append("\t"); + sb.append(rcv.getsValue()); + System.out.println(sb.toString()); + } + entriesFound++; + if (System.out.checkError()) { + break; } } + + if (entriesFound == 0) { + System.out.println("\nNo data found\n"); + } } catch (FluoException e) { System.out.println("Scan failed - " + e.getMessage()); } @@ -201,9 +208,11 @@ public abstract class AppRunner { Connector conn = AccumuloUtil.getConnector(sConfig); - ScannerConfiguration scanConfig = null; + Span span = null; + Collection<Column> columns = null; try { - scanConfig = buildScanConfig(options); + span = getSpan(options); + columns = getColumns(options); } catch (IllegalArgumentException e) { System.err.println(e.getMessage()); System.exit(-1); @@ -213,8 +222,8 @@ public abstract class AppRunner { try { Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY); - scanner.setRange(SpanUtil.toRange(scanConfig.getSpan())); - for (Column col : scanConfig.getColumns()) { + scanner.setRange(SpanUtil.toRange(span)); + for (Column col : columns) { if (col.isQualifierSet()) { scanner .fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier())); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java ---------------------------------------------------------------------- diff --git a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java b/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java index 2d7c17d..a84e8ea 100644 --- a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java +++ b/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java @@ -16,10 +16,10 @@ package org.apache.fluo.cluster.runner; import com.beust.jcommander.JCommander; -import org.apache.fluo.api.config.ScannerConfiguration; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; +import org.apache.fluo.core.impl.SnapshotScanner; import org.junit.Assert; import org.junit.Test; @@ -28,16 +28,16 @@ import org.junit.Test; */ public class ScanTest { - private ScannerConfiguration parseArgs(String args) { + private SnapshotScanner.Opts parseArgs(String args) { ScanOptions options = new ScanOptions(); JCommander jcommand = new JCommander(options); jcommand.parse(args.split(" ")); - return AppRunner.buildScanConfig(options); + return new SnapshotScanner.Opts(AppRunner.getSpan(options), AppRunner.getColumns(options)); } @Test public void testValidInput() { - ScannerConfiguration config; + SnapshotScanner.Opts config; config = parseArgs(""); Assert.assertEquals(RowColumn.EMPTY, config.getSpan().getStart()); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java deleted file mode 100644 index 266df69..0000000 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/ColumnIteratorImpl.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.core.impl; - -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.core.util.ByteUtil; - -/** - * Implementation of Column Iterator - */ -public class ColumnIteratorImpl implements ColumnIterator { - - private Iterator<Entry<Key, Value>> scanner; - private Entry<Key, Value> firstEntry; - - ColumnIteratorImpl(Iterator<Entry<Key, Value>> scanner) { - this(null, scanner); - } - - ColumnIteratorImpl(Entry<Key, Value> firstEntry, Iterator<Entry<Key, Value>> cols) { - this.firstEntry = firstEntry; - this.scanner = cols; - } - - @Override - public boolean hasNext() { - return firstEntry != null || scanner.hasNext(); - } - - // TODO create custom class to return instead of entry - @Override - public Entry<Column, Bytes> next() { - Entry<Key, Value> entry; - if (firstEntry != null) { - entry = firstEntry; - firstEntry = null; - } else { - entry = scanner.next(); - } - final Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData()); - final Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData()); - final Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData()); - - final Column col = new Column(cf, cq, cv); - final Bytes val = Bytes.of(entry.getValue().get()); - - return new Entry<Column, Bytes>() { - - @Override - public Bytes setValue(Bytes value) { - throw new UnsupportedOperationException(); - } - - @Override - public Bytes getValue() { - return val; - } - - @Override - public Column getKey() { - return col; - } - }; - } - - @Override - public void remove() { - scanner.remove(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java deleted file mode 100644 index efdefeb..0000000 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/RowIteratorImpl.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.core.impl; - -import java.util.Iterator; -import java.util.Map.Entry; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; - -/** - * Implementation of RowIterator - */ -public class RowIteratorImpl implements RowIterator { - - private final org.apache.accumulo.core.client.RowIterator rowIter; - - RowIteratorImpl(Iterator<Entry<Key, Value>> scanner) { - rowIter = new org.apache.accumulo.core.client.RowIterator(scanner); - } - - @Override - public boolean hasNext() { - return rowIter.hasNext(); - } - - // TODO create custom class to return instead of entry - @Override - public Entry<Bytes, ColumnIterator> next() { - Iterator<Entry<Key, Value>> cols = rowIter.next(); - - Entry<Key, Value> entry = cols.next(); - - final Bytes row = Bytes.of(entry.getKey().getRowData().toArray()); - final ColumnIterator coliter = new ColumnIteratorImpl(entry, cols); - - return new Entry<Bytes, ColumnIterator>() { - - @Override - public Bytes getKey() { - return row; - } - - @Override - public ColumnIterator getValue() { - return coliter; - } - - @Override - public ColumnIterator setValue(ColumnIterator value) { - throw new UnsupportedOperationException(); - } - }; - - } - - @Override - public void remove() { - rowIter.remove(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java index 2b9959d..cd2e008 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java @@ -16,12 +16,13 @@ package org.apache.fluo.core.impl; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Set; +import com.google.common.collect.ImmutableSet; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; @@ -30,7 +31,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.fluo.accumulo.iterators.SnapshotIterator; import org.apache.fluo.accumulo.util.ColumnConstants; -import org.apache.fluo.api.config.ScannerConfiguration; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; @@ -41,45 +41,41 @@ import org.apache.fluo.core.util.UtilWaitThread; /** * Allows users to iterate over entries of a {@link org.apache.fluo.api.client.Snapshot} */ -public class SnapshotScanner implements Iterator<Entry<Key, Value>> { +public class SnapshotScanner implements Iterable<Entry<Key, Value>> { + + /** + * Immutable options for a SnapshotScanner + */ + public static final class Opts { + private final Span span; + private final Collection<Column> columns; + + public Opts(Span span, Collection<Column> columns) { + this.span = span; + this.columns = ImmutableSet.copyOf(columns); + } + + public Span getSpan() { + return span; + } + + public Collection<Column> getColumns() { + return columns; + } + } private final long startTs; private final Environment env; private final TxStats stats; - - private Iterator<Entry<Key, Value>> iterator; - private Entry<Key, Value> next; - private ScannerConfiguration config; + private final Opts config; static final long INITIAL_WAIT_TIME = 50; // TODO make configurable static final long MAX_WAIT_TIME = 60000; - public SnapshotScanner(Environment env, ScannerConfiguration config, long startTs, TxStats stats) { - this.env = env; - this.config = config; - this.startTs = startTs; - this.stats = stats; - setUpIterator(); - } - private void setUpIterator() { - Scanner scanner; - try { - scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations()); - } catch (TableNotFoundException e) { - throw new RuntimeException(e); - } - scanner.clearColumns(); - scanner.clearScanIterators(); - scanner.setRange(SpanUtil.toRange(config.getSpan())); - - setupScanner(scanner, config.getColumns(), startTs); - this.iterator = scanner.iterator(); - } - - static void setupScanner(ScannerBase scanner, Set<Column> columns, long startTs) { + static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs) { for (Column col : columns) { if (col.isQualifierSet()) { scanner.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier())); @@ -93,119 +89,153 @@ public class SnapshotScanner implements Iterator<Entry<Key, Value>> { scanner.addScanIterator(iterConf); } - @Override - public boolean hasNext() { - if (next == null) { - next = getNext(); + private class SnapIter implements Iterator<Entry<Key, Value>> { + + private Iterator<Entry<Key, Value>> iterator; + private Entry<Key, Value> next; + private Opts snapIterConfig; + + SnapIter(Opts config) { + this.snapIterConfig = config; + setUpIterator(); } - return next != null; - } + private void setUpIterator() { + Scanner scanner; + try { + scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations()); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + scanner.clearColumns(); + scanner.clearScanIterators(); + scanner.setRange(SpanUtil.toRange(snapIterConfig.getSpan())); - @Override - public Entry<Key, Value> next() { - if (!hasNext()) { - throw new NoSuchElementException(); + setupScanner(scanner, snapIterConfig.getColumns(), startTs); + + this.iterator = scanner.iterator(); } - Entry<Key, Value> tmp = next; - next = null; - return tmp; - } + @Override + public boolean hasNext() { + if (next == null) { + next = getNext(); + } - private void resetScanner(Span span) { - try { - config = (ScannerConfiguration) config.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); + return next != null; } - config.setSpan(span); - setUpIterator(); - } + @Override + public Entry<Key, Value> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + Entry<Key, Value> tmp = next; + next = null; + return tmp; + } - public void resolveLock(Entry<Key, Value> lockEntry) { + private void resetScanner(Span span) { + snapIterConfig = new Opts(span, snapIterConfig.columns); + setUpIterator(); + } - // read ahead a little bit looking for other locks to resolve + public void resolveLock(Entry<Key, Value> lockEntry) { - long startTime = System.currentTimeMillis(); - long waitTime = INITIAL_WAIT_TIME; + // read ahead a little bit looking for other locks to resolve - List<Entry<Key, Value>> locks = new ArrayList<>(); - locks.add(lockEntry); - int amountRead = 0; - int numRead = 0; + long startTime = System.currentTimeMillis(); + long waitTime = INITIAL_WAIT_TIME; - RowColumn origEnd = config.getSpan().getEnd(); - boolean isEndInclusive = config.getSpan().isEndInclusive(); + List<Entry<Key, Value>> locks = new ArrayList<>(); + locks.add(lockEntry); + int amountRead = 0; + int numRead = 0; - while (true) { - while (iterator.hasNext()) { - Entry<Key, Value> entry = iterator.next(); + RowColumn origEnd = snapIterConfig.getSpan().getEnd(); + boolean isEndInclusive = snapIterConfig.getSpan().isEndInclusive(); - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + while (true) { + while (iterator.hasNext()) { + Entry<Key, Value> entry = iterator.next(); - if (colType == ColumnConstants.LOCK_PREFIX) { - locks.add(entry); - } + long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; - amountRead += entry.getKey().getSize() + entry.getValue().getSize(); - numRead++; + if (colType == ColumnConstants.LOCK_PREFIX) { + locks.add(entry); + } - if (numRead > 100 || amountRead > 1 << 12) { - break; + amountRead += entry.getKey().getSize() + entry.getValue().getSize(); + numRead++; + + if (numRead > 100 || amountRead > 1 << 12) { + break; + } } - } - boolean resolvedLocks = LockResolver.resolveLocks(env, startTs, stats, locks, startTime); + boolean resolvedLocks = LockResolver.resolveLocks(env, startTs, stats, locks, startTime); - if (!resolvedLocks) { - UtilWaitThread.sleep(waitTime); - stats.incrementLockWaitTime(waitTime); - waitTime = Math.min(MAX_WAIT_TIME, waitTime * 2); + if (!resolvedLocks) { + UtilWaitThread.sleep(waitTime); + stats.incrementLockWaitTime(waitTime); + waitTime = Math.min(MAX_WAIT_TIME, waitTime * 2); - RowColumn start = SpanUtil.toRowColumn(locks.get(0).getKey()); - RowColumn end = SpanUtil.toRowColumn(locks.get(locks.size() - 1).getKey()).following(); + RowColumn start = SpanUtil.toRowColumn(locks.get(0).getKey()); + RowColumn end = SpanUtil.toRowColumn(locks.get(locks.size() - 1).getKey()).following(); - resetScanner(new Span(start, true, end, false)); + resetScanner(new Span(start, true, end, false)); - locks.clear(); + locks.clear(); - } else { - break; + } else { + break; + } } - } - RowColumn start = SpanUtil.toRowColumn(lockEntry.getKey()); + RowColumn start = SpanUtil.toRowColumn(lockEntry.getKey()); - resetScanner(new Span(start, true, origEnd, isEndInclusive)); - } + resetScanner(new Span(start, true, origEnd, isEndInclusive)); + } - public Entry<Key, Value> getNext() { - mloop: while (true) { - // its possible a next could exist then be rolled back - if (!iterator.hasNext()) { - return null; - } + public Entry<Key, Value> getNext() { + mloop: while (true) { + // its possible a next could exist then be rolled back + if (!iterator.hasNext()) { + return null; + } - Entry<Key, Value> entry = iterator.next(); + Entry<Key, Value> entry = iterator.next(); - long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; + long colType = entry.getKey().getTimestamp() & ColumnConstants.PREFIX_MASK; - if (colType == ColumnConstants.LOCK_PREFIX) { - resolveLock(entry); - continue mloop; - } else if (colType == ColumnConstants.DATA_PREFIX) { - stats.incrementEntriesReturned(1); - return entry; - } else { - throw new IllegalArgumentException("Unexpected column type " + colType); + if (colType == ColumnConstants.LOCK_PREFIX) { + resolveLock(entry); + continue mloop; + } else if (colType == ColumnConstants.DATA_PREFIX) { + stats.incrementEntriesReturned(1); + return entry; + } else { + throw new IllegalArgumentException("Unexpected column type " + colType); + } } } + + @Override + public void remove() { + iterator.remove(); + } + } + + SnapshotScanner(Environment env, Opts config, long startTs, TxStats stats) { + this.env = env; + this.config = config; + this.startTs = startTs; + this.stats = stats; } @Override - public void remove() { - iterator.remove(); + public Iterator<Entry<Key, Value>> iterator() { + return new SnapIter(config); } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index 3b0b85c..c4c429e 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -29,6 +29,7 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -53,22 +54,23 @@ import org.apache.fluo.accumulo.util.ColumnConstants; import org.apache.fluo.accumulo.values.DelLockValue; import org.apache.fluo.accumulo.values.LockValue; import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.config.ScannerConfiguration; +import org.apache.fluo.api.client.scanner.ScannerBuilder; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; import org.apache.fluo.api.exceptions.AlreadySetException; import org.apache.fluo.api.exceptions.CommitException; import org.apache.fluo.api.exceptions.FluoException; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; import org.apache.fluo.core.async.AsyncCommitObserver; import org.apache.fluo.core.async.AsyncConditionalWriter; import org.apache.fluo.core.async.AsyncTransaction; import org.apache.fluo.core.async.SyncCommitObserver; import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException; import org.apache.fluo.core.exceptions.StaleScanException; +import org.apache.fluo.core.impl.scanner.ColumnScannerImpl; +import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl; import org.apache.fluo.core.oracle.Stamp; import org.apache.fluo.core.util.ColumnUtil; import org.apache.fluo.core.util.ConditionalFlutation; @@ -207,36 +209,46 @@ public class TransactionImpl implements AsyncTransaction, Snapshot { return ret; } - @Override - public RowIterator get(ScannerConfiguration config) { - checkIfOpen(); - return getImpl(config); - } - private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) { // TODO push visibility filtering to server side? env.getSharedResources().getVisCache().validate(columns); - ScannerConfiguration config = new ScannerConfiguration(); - config.setSpan(Span.exact(row)); + boolean shouldCopy = false; + for (Column column : columns) { - config.fetchColumn(column.getFamily(), column.getQualifier()); + if (column.isVisibilitySet()) { + shouldCopy = true; + } } - RowIterator iter = getImpl(config); + SnapshotScanner.Opts opts; + if (shouldCopy) { + HashSet<Column> cols = new HashSet<Column>(); + for (Column column : columns) { + if (column.isVisibilitySet()) { + cols.add(new Column(column.getFamily(), column.getQualifier())); + } else { + cols.add(column); + } + } + opts = new SnapshotScanner.Opts(Span.exact(row), columns); + } else { + opts = new SnapshotScanner.Opts(Span.exact(row), columns); + } Map<Column, Bytes> ret = new HashMap<>(); - while (iter.hasNext()) { - Entry<Bytes, ColumnIterator> entry = iter.next(); - ColumnIterator citer = entry.getValue(); - while (citer.hasNext()) { - Entry<Column, Bytes> centry = citer.next(); - if (columns.contains(centry.getKey())) { - ret.put(centry.getKey(), centry.getValue()); + Iterable<ColumnValue> scanner = + Iterables.transform(new SnapshotScanner(env, opts, startTs, stats), ColumnScannerImpl.E2CV); + for (ColumnValue cv : scanner) { + if (shouldCopy) { + if (columns.contains(cv.getColumn())) { + ret.put(cv.getColumn(), cv.getValue()); } + } else { + ret.put(cv.getColumn(), cv.getValue()); } } @@ -246,8 +258,10 @@ public class TransactionImpl implements AsyncTransaction, Snapshot { return ret; } - private RowIterator getImpl(ScannerConfiguration config) { - return new RowIteratorImpl(new SnapshotScanner(this.env, config, startTs, stats)); + @Override + public ScannerBuilder scanner() { + checkIfOpen(); + return new ScannerBuilderImpl(this); } private void updateColumnsRead(Bytes row, Set<Column> columns) { @@ -1193,4 +1207,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot { cd.commitObserver.committed(); } + public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) { + return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns), startTs, stats); + } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java new file mode 100644 index 0000000..06e68d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/CellScannerImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.core.impl.scanner; + +import java.util.Iterator; +import java.util.Map.Entry; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.fluo.core.util.ByteUtil; + +public class CellScannerImpl implements CellScanner { + + private Iterable<Entry<Key, Value>> snapshot; + + private static final Function<Entry<Key, Value>, RowColumnValue> E2RCV = + new Function<Entry<Key, Value>, RowColumnValue>() { + @Override + public RowColumnValue apply(Entry<Key, Value> entry) { + Bytes row = ByteUtil.toBytes(entry.getKey().getRowData()); + Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData()); + Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData()); + Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData()); + Column col = new Column(cf, cq, cv); + Bytes val = Bytes.of(entry.getValue().get()); + return new RowColumnValue(row, col, val); + } + }; + + CellScannerImpl(Iterable<Entry<Key, Value>> snapshot) { + this.snapshot = snapshot; + } + + @Override + public Iterator<RowColumnValue> iterator() { + return Iterators.transform(snapshot.iterator(), E2RCV); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java new file mode 100644 index 0000000..c85dfeb --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ColumnScannerImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.core.impl.scanner; + +import java.util.Iterator; +import java.util.Map.Entry; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.core.util.ByteUtil; + +public class ColumnScannerImpl implements ColumnScanner { + + public static final Function<Entry<Key, Value>, ColumnValue> E2CV = + new Function<Entry<Key, Value>, ColumnValue>() { + @Override + public ColumnValue apply(Entry<Key, Value> entry) { + Bytes cf = ByteUtil.toBytes(entry.getKey().getColumnFamilyData()); + Bytes cq = ByteUtil.toBytes(entry.getKey().getColumnQualifierData()); + Bytes cv = ByteUtil.toBytes(entry.getKey().getColumnVisibilityData()); + Column col = new Column(cf, cq, cv); + Bytes val = Bytes.of(entry.getValue().get()); + return new ColumnValue(col, val); + } + }; + + private PeekingIterator<Entry<Key, Value>> peekingIter; + private Bytes row; + private Iterator<ColumnValue> iter; + private boolean gotIter = false; + + ColumnScannerImpl(Iterator<Entry<Key, Value>> e) { + peekingIter = Iterators.peekingIterator(e); + row = ByteUtil.toBytes(peekingIter.peek().getKey().getRowData()); + iter = Iterators.transform(peekingIter, E2CV); + } + + @Override + public Iterator<ColumnValue> iterator() { + Preconditions.checkState(!gotIter, + "Unfortunately this implementation only support getting the iterator once"); + gotIter = true; + return iter; + } + + @Override + public Bytes getRow() { + return row; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java new file mode 100644 index 0000000..4a9eb38 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/RowScannerImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.core.impl.scanner; + +import java.util.Iterator; +import java.util.Map.Entry; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; + +public class RowScannerImpl implements RowScanner { + + private Iterable<Entry<Key, Value>> snapshot; + + private static final Function<Iterator<Entry<Key, Value>>, ColumnScanner> RI2CS = + new Function<Iterator<Entry<Key, Value>>, ColumnScanner>() { + @Override + public ColumnScanner apply(Iterator<Entry<Key, Value>> input) { + return new ColumnScannerImpl(input); + } + }; + + RowScannerImpl(Iterable<Entry<Key, Value>> snapshot) { + this.snapshot = snapshot; + } + + @Override + public Iterator<ColumnScanner> iterator() { + RowIterator rowiter = new RowIterator(snapshot.iterator()); + return Iterators.transform(rowiter, RI2CS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java new file mode 100644 index 0000000..8c833d5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.core.impl.scanner; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.client.scanner.RowScannerBuilder; +import org.apache.fluo.api.client.scanner.ScannerBuilder; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.core.impl.SnapshotScanner; +import org.apache.fluo.core.impl.TransactionImpl; + +public class ScannerBuilderImpl implements ScannerBuilder { + + private static final Span EMPTY_SPAN = new Span(); + + private TransactionImpl tx; + private Span span = EMPTY_SPAN; + private Collection<Column> columns = Collections.emptyList(); + + public ScannerBuilderImpl(TransactionImpl tx) { + this.tx = tx; + } + + @Override + public ScannerBuilder over(Span span) { + Objects.requireNonNull(span); + this.span = span; + return this; + } + + private void setColumns(Collection<Column> columns) { + for (Column column : columns) { + Preconditions.checkArgument(!column.isVisibilitySet(), + "Fetching visibility is not currently supported"); + } + this.columns = columns; + } + + @Override + public ScannerBuilder fetch(Collection<Column> columns) { + Objects.requireNonNull(columns); + setColumns(ImmutableSet.copyOf(columns)); + return this; + } + + @Override + public ScannerBuilder fetch(Column... columns) { + Objects.requireNonNull(columns); + setColumns(ImmutableSet.copyOf(columns)); + return this; + } + + @Override + public CellScanner build() { + SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns); + return new CellScannerImpl(snapScanner); + } + + @Override + public RowScannerBuilder byRow() { + return new RowScannerBuilder() { + @Override + public RowScanner build() { + SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns); + return new RowScannerImpl(snapScanner); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java index da6b8dd..fe5c21d 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java +++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java @@ -23,14 +23,13 @@ import java.util.Set; import com.google.common.base.Function; import com.google.common.collect.Iterators; import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.ScannerBuilder; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ScannerConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.exceptions.AlreadySetException; import org.apache.fluo.api.exceptions.CommitException; -import org.apache.fluo.api.iterator.RowIterator; import org.apache.fluo.core.async.AsyncCommitObserver; import org.apache.fluo.core.async.AsyncTransaction; import org.apache.fluo.core.impl.Notification; @@ -179,10 +178,10 @@ public class TracingTransaction implements AsyncTransaction, Snapshot { } @Override - public RowIterator get(ScannerConfiguration config) { + public ScannerBuilder scanner() { // TODO log something better (see fluo-425) - log.trace("txid: {} get(ScannerConfiguration)", txid); - return tx.get(config); + log.trace("txid: {} newScanner()", txid); + return tx.scanner(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java b/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java deleted file mode 100644 index f3cc7af..0000000 --- a/modules/core/src/test/java/org/apache/fluo/core/config/ScannerConfigurationTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.core.config; - -import org.apache.fluo.api.config.ScannerConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.Span; -import org.junit.Assert; -import org.junit.Test; - -/** - * Unit test for ScannerConfiguration class - */ -public class ScannerConfigurationTest { - - @Test - public void testSetGet() { - - ScannerConfiguration config = new ScannerConfiguration(); - Assert.assertEquals(new Span(), config.getSpan()); - Assert.assertEquals(0, config.getColumns().size()); - - config = new ScannerConfiguration(); - config.setSpan(Span.exact("row1")); - Assert.assertEquals(Span.exact("row1"), config.getSpan()); - Assert.assertEquals(0, config.getColumns().size()); - - config = new ScannerConfiguration(); - config.fetchColumnFamily(Bytes.of("cf1")); - Assert.assertEquals(1, config.getColumns().size()); - Assert.assertEquals(new Column("cf1"), config.getColumns().iterator().next()); - - config = new ScannerConfiguration(); - config.fetchColumn(Bytes.of("cf2"), Bytes.of("cq2")); - Assert.assertEquals(1, config.getColumns().size()); - Assert.assertEquals(new Column("cf2", "cq2"), config.getColumns().iterator().next()); - - config = new ScannerConfiguration(); - config.fetchColumnFamily(Bytes.of("a")); - config.fetchColumnFamily(Bytes.of("b")); - config.fetchColumnFamily(Bytes.of("a")); - Assert.assertEquals(2, config.getColumns().size()); - - config.clearColumns(); - Assert.assertEquals(0, config.getColumns().size()); - } - - @Test - public void testNullSet() { - - ScannerConfiguration config = new ScannerConfiguration(); - - try { - config.setSpan(null); - Assert.fail(); - } catch (NullPointerException e) { - } - - try { - config.fetchColumnFamily(null); - Assert.fail(); - } catch (NullPointerException e) { - } - - try { - config.fetchColumn(null, Bytes.of("qual")); - Assert.fail(); - } catch (NullPointerException e) { - } - - try { - config.fetchColumn(Bytes.of("fam"), null); - Assert.fail(); - } catch (NullPointerException e) { - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java index 4cdf146..5e2a7f9 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java @@ -18,7 +18,6 @@ package org.apache.fluo.integration; import java.io.File; import java.util.Collections; import java.util.List; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.client.Connector; @@ -32,11 +31,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.ObserverConfiguration; -import org.apache.fluo.api.config.ScannerConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; +import org.apache.fluo.api.data.RowColumnValue; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -99,18 +94,12 @@ public class ITBase { protected void printSnapshot() throws Exception { try (Snapshot s = client.newSnapshot()) { - RowIterator iter = s.get(new ScannerConfiguration()); - System.out.println("== snapshot start =="); - while (iter.hasNext()) { - Entry<Bytes, ColumnIterator> rowEntry = iter.next(); - ColumnIterator citer = rowEntry.getValue(); - while (citer.hasNext()) { - Entry<Column, Bytes> colEntry = citer.next(); - System.out.println(rowEntry.getKey() + " " + colEntry.getKey() + "\t" - + colEntry.getValue()); - } + + for (RowColumnValue rcv : s.scanner().build()) { + System.out.println(rcv.getRow() + " " + rcv.getColumn() + "\t" + rcv.getValue()); } + System.out.println("=== snapshot end ==="); } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java index e2ed47a..3a541d9 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java @@ -77,6 +77,7 @@ public class ITBaseImpl extends ITBase { config.setTransactionRollbackTime(1, TimeUnit.SECONDS); config.addObservers(getObservers()); config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000"); + config.setMiniStartAccumulo(false); try (FluoAdmin admin = FluoFactory.newAdmin(config)) { InitOpts opts = new InitOpts().setClearZookeeper(true).setClearTable(true); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java index b64cda5..ceda193 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/TestTransaction.java @@ -31,14 +31,13 @@ import org.apache.fluo.accumulo.iterators.NotificationIterator; import org.apache.fluo.accumulo.util.ColumnConstants; import org.apache.fluo.accumulo.util.NotificationUtil; import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ScannerConfiguration; +import org.apache.fluo.api.client.scanner.ScannerBuilder; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; import org.apache.fluo.api.exceptions.AlreadySetException; import org.apache.fluo.api.exceptions.CommitException; -import org.apache.fluo.api.iterator.RowIterator; import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException; import org.apache.fluo.core.impl.Environment; import org.apache.fluo.core.impl.Notification; @@ -205,8 +204,8 @@ public class TestTransaction implements TransactionBase { } @Override - public RowIterator get(ScannerConfiguration config) { - return tx.get(config); + public ScannerBuilder scanner() { + return tx.scanner(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java index 321374b..086f1d9 100644 --- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java @@ -29,15 +29,14 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.ObserverConfiguration; -import org.apache.fluo.api.config.ScannerConfiguration; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; import org.apache.fluo.api.exceptions.CommitException; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; import org.apache.fluo.api.observer.AbstractObserver; import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException; import org.apache.fluo.core.impl.Environment; @@ -469,14 +468,11 @@ public class FluoIT extends ITBaseImpl { tx3.done(); HashSet<Column> columns = new HashSet<>(); - RowIterator riter = - tx2.get(new ScannerConfiguration().setSpan(Span.exact(Bytes.of("d00001"), - new Column(Bytes.of("outlink"))))); - while (riter.hasNext()) { - ColumnIterator citer = riter.next().getValue(); - while (citer.hasNext()) { - columns.add(citer.next().getKey()); - } + + CellScanner cellScanner = + tx2.scanner().over(Span.exact(Bytes.of("d00001"))).fetch(new Column("outlink")).build(); + for (RowColumnValue rcv : cellScanner) { + columns.add(rcv.getColumn()); } tx2.done(); @@ -490,15 +486,12 @@ public class FluoIT extends ITBaseImpl { TestTransaction tx4 = new TestTransaction(env); columns.clear(); - riter = - tx4.get(new ScannerConfiguration().setSpan(Span.exact(Bytes.of("d00001"), - new Column(Bytes.of("outlink"))))); - while (riter.hasNext()) { - ColumnIterator citer = riter.next().getValue(); - while (citer.hasNext()) { - columns.add(citer.next().getKey()); - } + cellScanner = + tx4.scanner().over(Span.exact(Bytes.of("d00001"))).fetch(new Column("outlink")).build(); + for (RowColumnValue rcv : cellScanner) { + columns.add(rcv.getColumn()); } + expected.add(new Column("outlink", "http://z.com")); expected.remove(new Column("outlink", "http://b.com")); Assert.assertEquals(expected, columns); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/23374784/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java ---------------------------------------------------------------------- diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java new file mode 100644 index 0000000..bdf473d --- /dev/null +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ScannerIT.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.integration.impl; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.integration.ITBaseImpl; +import org.junit.Assert; +import org.junit.Test; + +public class ScannerIT extends ITBaseImpl { + + static class ColumnPredicate implements Predicate<RowColumnValue> { + Column c; + + ColumnPredicate(Column c) { + this.c = c; + } + + @Override + public boolean apply(RowColumnValue input) { + return input.getColumn().equals(c); + } + } + + static class FamilyPredicate implements Predicate<RowColumnValue> { + Bytes fam; + + FamilyPredicate(String family) { + this.fam = Bytes.of(family); + } + + @Override + public boolean apply(RowColumnValue input) { + return input.getColumn().getFamily().equals(fam); + } + } + + static class RowPredicate implements Predicate<RowColumnValue> { + Bytes row; + + RowPredicate(String row) { + this.row = Bytes.of(row); + } + + @Override + public boolean apply(RowColumnValue input) { + return input.getRow().equals(row); + } + } + + @Test + public void testFiltering() { + Set<RowColumnValue> expected = genData(); + + HashSet<RowColumnValue> expectedR2 = new HashSet<>(); + Iterables.addAll(expectedR2, Iterables.filter(expected, new RowPredicate("r2"))); + Assert.assertEquals(2, expectedR2.size()); + + + HashSet<RowColumnValue> expectedR2c = new HashSet<>(); + Iterables.addAll( + expectedR2c, + Iterables.filter(expected, + Predicates.and(new RowPredicate("r2"), new ColumnPredicate(new Column("f1", "q2"))))); + Assert.assertEquals(1, expectedR2c.size()); + + HashSet<RowColumnValue> expectedC = new HashSet<>(); + Iterables.addAll(expectedC, + Iterables.filter(expected, new ColumnPredicate(new Column("f1", "q1")))); + Assert.assertEquals(2, expectedC.size()); + + HashSet<RowColumnValue> expectedCF = new HashSet<>(); + Iterables.addAll(expectedCF, Iterables.filter(expected, new FamilyPredicate("f2"))); + Assert.assertEquals(2, expectedCF.size()); + + HashSet<RowColumnValue> expectedCols = new HashSet<>(); + Iterables.addAll(expectedCols, Iterables.filter(expected, Predicates.or(new ColumnPredicate( + new Column("f2", "q5")), new ColumnPredicate(new Column("f1", "q1"))))); + Assert.assertEquals(3, expectedCols.size()); + + try (Snapshot snap = client.newSnapshot()) { + HashSet<RowColumnValue> actual = new HashSet<>(); + Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).build()); + Assert.assertEquals(expectedR2, actual); + + actual.clear(); + Iterables.addAll(actual, snap.scanner().over(Span.exact("r2")).fetch(new Column("f1", "q2")) + .build()); + Assert.assertEquals(expectedR2c, actual); + + actual.clear(); + Iterables.addAll(actual, snap.scanner().fetch(new Column("f1", "q1")).build()); + Assert.assertEquals(expectedC, actual); + + actual.clear(); + Iterables.addAll(actual, snap.scanner().fetch(new Column("f2")).build()); + Assert.assertEquals(expectedCF, actual); + + actual.clear(); + Iterables.addAll(actual, snap.scanner().fetch(new Column("f2", "q5"), new Column("f1", "q1")) + .build()); + Assert.assertEquals(expectedCols, actual); + } + + } + + @Test + public void testMultipleIteratorsFromSameRowScanner() { + Set<RowColumnValue> expected = genData(); + + try (Snapshot snap = client.newSnapshot()) { + RowScanner rowScanner = snap.scanner().byRow().build(); + + Iterator<ColumnScanner> iter1 = rowScanner.iterator(); + Iterator<ColumnScanner> iter2 = rowScanner.iterator(); + + HashSet<RowColumnValue> actual1 = new HashSet<>(); + HashSet<RowColumnValue> actual2 = new HashSet<>(); + + while (iter1.hasNext()) { + ColumnScanner cs1 = iter1.next(); + + Assert.assertTrue(iter2.hasNext()); + ColumnScanner cs2 = iter2.next(); + + for (ColumnValue cv : cs1) { + actual1.add(new RowColumnValue(cs1.getRow(), cv.getColumn(), cv.getValue())); + } + + for (ColumnValue cv : cs2) { + actual2.add(new RowColumnValue(cs2.getRow(), cv.getColumn(), cv.getValue())); + } + } + + Assert.assertFalse(iter2.hasNext()); + + Assert.assertEquals(expected, actual1); + Assert.assertEquals(expected, actual2); + } + } + + @Test + public void testMultipleIteratorsFromSameIterable() { + + Set<RowColumnValue> expected = genData(); + + try (Snapshot snap = client.newSnapshot()) { + CellScanner cellScanner = snap.scanner().build(); + // grab two iterators from same iterable and iterator over them in interleaved fashion + Iterator<RowColumnValue> iter1 = cellScanner.iterator(); + Iterator<RowColumnValue> iter2 = cellScanner.iterator(); + + HashSet<RowColumnValue> actual1 = new HashSet<>(); + HashSet<RowColumnValue> actual2 = new HashSet<>(); + + while (iter1.hasNext()) { + Assert.assertTrue(iter2.hasNext()); + actual1.add(iter1.next()); + actual2.add(iter2.next()); + } + + Assert.assertFalse(iter2.hasNext()); + + Assert.assertEquals(expected, actual1); + Assert.assertEquals(expected, actual2); + } + } + + private Set<RowColumnValue> genData() { + Set<RowColumnValue> expected = new HashSet<>(); + expected.add(new RowColumnValue("r1", new Column("f1", "q1"), "v1")); + expected.add(new RowColumnValue("r1", new Column("f2", "q3"), "v2")); + expected.add(new RowColumnValue("r2", new Column("f1", "q1"), "v3")); + expected.add(new RowColumnValue("r2", new Column("f1", "q2"), "v4")); + expected.add(new RowColumnValue("r4", new Column("f2", "q5"), "v5")); + + Assert.assertEquals(5, expected.size()); + + try (Transaction tx = client.newTransaction()) { + for (RowColumnValue rcv : expected) { + tx.set(rcv.getRow(), rcv.getColumn(), rcv.getValue()); + } + tx.commit(); + } + + return expected; + } +}