This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/main by this push: new 4bdb9389 Adds scan time authorizations to transactions/snapshots (#1120) 4bdb9389 is described below commit 4bdb938939801c531589281751677c25cbfd0565 Author: Bill S <w...@users.noreply.github.com> AuthorDate: Sat Jan 14 23:08:09 2023 -0500 Adds scan time authorizations to transactions/snapshots (#1120) 1. Allows a user to specify a collection of authorizations/visibility labels to use when scanning data. Writes are allowed so long as the current connection's authorizations satisfy any visibility expressions. 2. Connection-wide authorizations are set in the fluo configuration and must be set on the underlying Accumulo user. Co-authored-by: Bill Slacum <b...@glidedog.com> --- .../java/org/apache/fluo/api/client/Snapshot.java | 2 + .../org/apache/fluo/api/client/SnapshotBase.java | 20 +++ .../apache/fluo/api/config/FluoConfiguration.java | 23 +++ .../fluo/api/config/SimpleConfiguration.java | 28 ++++ .../java/org/apache/fluo/command/ScanTest.java | 4 +- .../apache/fluo/core/client/FluoClientImpl.java | 3 + .../org/apache/fluo/core/impl/Environment.java | 11 ++ .../fluo/core/impl/ParallelSnapshotScanner.java | 29 +++- .../org/apache/fluo/core/impl/SnapshotScanner.java | 14 +- .../org/apache/fluo/core/impl/TransactionImpl.java | 56 +++++-- .../fluo/core/impl/scanner/ScannerBuilderImpl.java | 22 ++- .../client/FluoClientAuthorizationsIT.java | 170 +++++++++++++++++++++ .../fluo/integration/client/FluoClientIT.java | 29 ++++ 13 files changed, 387 insertions(+), 24 deletions(-) diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java b/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java index 625cf626..824a14e2 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/Snapshot.java @@ -15,6 +15,8 @@ package org.apache.fluo.api.client; +import java.util.Collection; + /** * Allows users to read from a Fluo table at a certain point in time. Snapshot extends * {@link SnapshotBase} to include a {@link #close} method which must be called when you are diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java index 7c4a2269..ff0a694c 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java @@ -203,4 +203,24 @@ public interface SnapshotBase { default CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) { return CompletableFuture.completedFuture(get(row, column, defaultValue)); } + + /** + * All reads done using this snapshot after this call will use the passed in authorizations to + * filter data. + * + * @since 2.0.0 + */ + default void setScanTimeAuthorizations(Collection<String> authorizations) { + throw new UnsupportedOperationException(); + } + + /** + * Returns the set of scan time authorization that are currently in use for filtering data. The + * empty set indicates no filtering is being done using scan time authorizations. + * + * @since 2.0.0 + */ + default Collection<String> getScanTimeAuthorizations() { + throw new UnsupportedOperationException(); + } } diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java index 3cf944b5..b18b21c3 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java +++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java @@ -151,6 +151,9 @@ public class FluoConfiguration extends SimpleConfiguration { * @since 1.2.0 */ public static final String ACCUMULO_USER_PROP = ACCUMULO_PREFIX + ".user"; + + public static final String ACCUMULO_AUTH_PROP = ACCUMULO_PREFIX + ".auths"; + /** * @since 1.2.0 */ @@ -486,6 +489,7 @@ public class FluoConfiguration extends SimpleConfiguration { return getDepNonEmptyString(ACCUMULO_USER_PROP, CLIENT_ACCUMULO_USER_PROP); } + /** * Sets the Apache Accumulo password property {@value #ACCUMULO_PASSWORD_PROP} * @@ -510,6 +514,25 @@ public class FluoConfiguration extends SimpleConfiguration { throw new NoSuchElementException(ACCUMULO_PASSWORD_PROP + " is not set!"); } + /** + * @since 2.0.0 + */ + public FluoConfiguration setAccumuloAuthorizations(String... auths) { + setProperties(ACCUMULO_AUTH_PROP, auths); + return this; + } + + /** + * @since 2.0.0 + */ + public String[] getAccumuloAuthorizations() { + if (containsKey(ACCUMULO_AUTH_PROP)) { + return this.getProperties(ACCUMULO_AUTH_PROP); + } else { + return new String[0]; + } + } + /** * Sets the value of the property {@value #ACCUMULO_ZOOKEEPERS_PROP} * diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java index b48dd835..88aebb0b 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java +++ b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java @@ -30,6 +30,7 @@ import java.io.StringReader; import java.io.UncheckedIOException; import java.io.Writer; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -242,6 +243,33 @@ public class SimpleConfiguration implements Serializable { internalConfig.setProperty(key, value); } + /** + * @since 2.0.0 + */ + public void setProperties(String key, String... values) { + Objects.requireNonNull(values, "Values for key `" + key + "` must be non-null."); + // don't let callers modify the array underneath of us + String[] copy = new String[values.length]; + System.arraycopy(values, 0, copy, 0, copy.length); + for (String value : copy) { + Objects.requireNonNull(value, "Encountered null value for key `" + key + "`."); + } + internalConfig.setProperty(key, values); + } + + /** + * @since 2.0.0 + */ + public String[] getProperties(String key) { + // TODO fix cast class; use Properties? + ArrayList<String> values = (ArrayList<String>) internalConfig.getProperty(key); + if (values == null) { + return new String[0]; + } else { + return values.toArray(new String[values.size()]); + } + } + /** * Returns a subset of config that start with given prefix. The prefix will not be present in keys * of the returned config. Any changes made to the returned config will be made to this and visa diff --git a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java index fabc3aaf..f7f1dd3f 100644 --- a/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java +++ b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java @@ -16,6 +16,7 @@ package org.apache.fluo.command; import com.beust.jcommander.JCommander; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.Span; @@ -34,7 +35,8 @@ public class ScanTest { JCommander jcommand = new JCommander(scan); jcommand.parse(args.split(" ")); ScanUtil.ScanOpts opts = scan.getScanOpts(); - return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false); + return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts), false, + Authorizations.EMPTY); } @Test diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java index 22749900..797f7ffb 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java @@ -15,11 +15,14 @@ package org.apache.fluo.core.client; +import java.util.Collection; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.LoaderExecutor; import org.apache.fluo.api.client.Snapshot; diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java index f9a23162..9c35743e 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java @@ -18,10 +18,14 @@ package org.apache.fluo.core.impl; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map.Entry; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -102,6 +106,13 @@ public class Environment implements AutoCloseable { + client.instanceOperations().getInstanceId() + " != " + accumuloInstanceID); } + String[] auths = config.getAccumuloAuthorizations(); + if (auths.length == 0) { + this.auths = new Authorizations(); + } else { + this.auths = new Authorizations(auths); + } + try { resources = new SharedResources(this); } catch (TableNotFoundException e1) { diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java index 4d2b37ce..17f4620f 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/ParallelSnapshotScanner.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; @@ -56,9 +57,11 @@ public class ParallelSnapshotScanner { private Map<Bytes, Set<Column>> readLocksSeen; private Consumer<Entry<Key, Value>> writeLocksSeen; + private Authorizations authorizations; + ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env, long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen, - Consumer<Entry<Key, Value>> writeLocksSeen) { + Consumer<Entry<Key, Value>> writeLocksSeen, Authorizations authorizations) { this.rows = rows; this.columns = columns; this.env = env; @@ -68,19 +71,33 @@ public class ParallelSnapshotScanner { this.columnConverter = new CachedColumnConverter(columns); this.readLocksSeen = readLocksSeen; this.writeLocksSeen = writeLocksSeen; + this.authorizations = authorizations; + } + + ParallelSnapshotScanner(Collection<Bytes> rows, Set<Column> columns, Environment env, + long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen, + Consumer<Entry<Key, Value>> writeLocksSeen) { + this(rows, columns, env, startTs, stats, readLocksSeen, writeLocksSeen, + env.getAuthorizations()); } ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats, Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> writeLocksSeen) { + this(cells, env, startTs, stats, readLocksSeen, writeLocksSeen, env.getAuthorizations()); + } + + ParallelSnapshotScanner(Collection<RowColumn> cells, Environment env, long startTs, TxStats stats, + Map<Bytes, Set<Column>> readLocksSeen, Consumer<Entry<Key, Value>> writeLocksSeen, + Authorizations authorizations) { for (RowColumn rc : cells) { byte[] r = rc.getRow().toArray(); byte[] cf = rc.getColumn().getFamily().toArray(); byte[] cq = rc.getColumn().getQualifier().toArray(); - byte[] cv = rc.getColumn().getVisibility().toArray(); + byte[] cv = new byte[0]; + byte[] cv2 = new byte[] {(byte) 0xff}; Key start = new Key(r, cf, cq, cv, Long.MAX_VALUE, false, false); - Key end = new Key(start); - end.setTimestamp(Long.MIN_VALUE); + Key end = new Key(r, cf, cq, cv2, Long.MIN_VALUE, false, false); rangesToScan.add(new Range(start, true, end, true)); } @@ -92,6 +109,7 @@ public class ParallelSnapshotScanner { this.columnConverter = ColumnUtil::convert; this.readLocksSeen = readLocksSeen; this.writeLocksSeen = writeLocksSeen; + this.authorizations = authorizations; } private BatchScanner setupBatchScanner() { @@ -100,8 +118,7 @@ public class ParallelSnapshotScanner { try { // TODO hardcoded number of threads! // one thread is probably good.. going for throughput - scanner = - env.getAccumuloClient().createBatchScanner(env.getTable(), env.getAuthorizations(), 1); + scanner = env.getAccumuloClient().createBatchScanner(env.getTable(), this.authorizations, 1); } catch (TableNotFoundException e) { throw new RuntimeException(e); } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java index 3defc897..71c46d27 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SnapshotScanner.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.accumulo.iterators.SnapshotIterator; import org.apache.fluo.accumulo.util.ColumnType; import org.apache.fluo.api.data.Column; @@ -51,10 +52,14 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> { private final Collection<Column> columns; private final boolean showReadLocks; - public Opts(Span span, Collection<Column> columns, boolean showReadLocks) { + private final Authorizations scanTimeAuthz; + + public Opts(Span span, Collection<Column> columns, boolean showReadLocks, + Authorizations scanTimeAuthz) { this.span = span; this.columns = ImmutableSet.copyOf(columns); this.showReadLocks = showReadLocks; + this.scanTimeAuthz = scanTimeAuthz; } public Span getSpan() { @@ -111,7 +116,9 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> { private void setUpIterator() { Scanner scanner; try { - scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations()); + scanner = env.getAccumuloClient().createScanner(env.getTable(), + snapIterConfig.scanTimeAuthz == null ? env.getAuthorizations() + : snapIterConfig.scanTimeAuthz); } catch (TableNotFoundException e) { throw new RuntimeException(e); } @@ -145,7 +152,8 @@ public class SnapshotScanner implements Iterable<Entry<Key, Value>> { } private void resetScanner(Span span) { - snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks); + snapIterConfig = new Opts(span, snapIterConfig.columns, snapIterConfig.showReadLocks, + snapIterConfig.scanTimeAuthz); setUpIterator(); } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index 8c32aec3..6b6f37fe 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -15,6 +15,7 @@ package org.apache.fluo.core.impl; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -31,6 +32,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -50,6 +52,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.accumulo.iterators.PrewriteIterator; import org.apache.fluo.accumulo.util.ColumnConstants; import org.apache.fluo.accumulo.util.ColumnType; @@ -140,11 +143,14 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra private boolean commitAttempted = false; private AsyncReader asyncReader = null; + private Authorizations scanTimeAuthz; - public TransactionImpl(Environment env, Notification trigger, long startTs) { + public TransactionImpl(Environment env, Notification trigger, long startTs, + Authorizations scanTimeAuthz) { Objects.requireNonNull(env, "environment cannot be null"); Preconditions.checkArgument(startTs >= 0, "startTs cannot be negative"); this.env = env; + this.scanTimeAuthz = Objects.requireNonNull(scanTimeAuthz); this.stats = new TxStats(env); this.startTs = startTs; this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG); @@ -164,15 +170,19 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra } public TransactionImpl(Environment env, Notification trigger) { - this(env, trigger, allocateTimestamp(env).getTxTimestamp()); + this(env, trigger, allocateTimestamp(env).getTxTimestamp(), env.getAuthorizations()); } public TransactionImpl(Environment env) { - this(env, null, allocateTimestamp(env).getTxTimestamp()); + this(env, null, allocateTimestamp(env).getTxTimestamp(), env.getAuthorizations()); } public TransactionImpl(Environment env, long startTs) { - this(env, null, startTs); + this(env, null, startTs, env.getAuthorizations()); + } + + public TransactionImpl(Environment env, Authorizations scanTimeAuthz) { + this(env, null, allocateTimestamp(env).getTxTimestamp(), scanTimeAuthz); } private static Stamp allocateTimestamp(Environment env) { @@ -204,7 +214,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rows, columns, env, startTs, stats, readLocksSeen, kve -> { - }); + }, this.scanTimeAuthz); Map<Bytes, Map<Column, Bytes>> ret = pss.scan(); @@ -246,9 +256,9 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra cols.add(column); } } - opts = new SnapshotScanner.Opts(Span.exact(row), columns, true); + opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz); } else { - opts = new SnapshotScanner.Opts(Span.exact(row), columns, true); + opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz); } Map<Column, Bytes> ret = new HashMap<>(); @@ -283,8 +293,8 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra return Collections.emptyMap(); } - ParallelSnapshotScanner pss = - new ParallelSnapshotScanner(rowColumns, env, startTs, stats, readLocksSeen, writeLocksSeen); + ParallelSnapshotScanner pss = new ParallelSnapshotScanner(rowColumns, env, startTs, stats, + readLocksSeen, writeLocksSeen, this.scanTimeAuthz); Map<Bytes, Map<Column, Bytes>> scan = pss.scan(); Map<RowColumn, Bytes> ret = new HashMap<>(); @@ -329,7 +339,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra @Override public ScannerBuilder scanner() { checkIfOpen(); - return new ScannerBuilderImpl(this); + return new ScannerBuilderImpl(this, this.scanTimeAuthz); } private void updateColumnsRead(Bytes row, Set<Column> columns) { @@ -1386,6 +1396,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra @Override public Collection<Mutation> createMutations(CommitData cd) { + long commitTs = getStats().getCommitTs(); ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1); for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) { @@ -1569,7 +1580,28 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra }); } - public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns) { - return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false), startTs, stats); + public SnapshotScanner newSnapshotScanner(Span span, Collection<Column> columns, + Authorizations scanTimeAuthz) { + return new SnapshotScanner(env, new SnapshotScanner.Opts(span, columns, false, scanTimeAuthz), + startTs, stats); + } + + @Override + public void setScanTimeAuthorizations(Collection<String> labels) { + Objects.requireNonNull(labels, "Authorization tokens must not be null!"); + String[] requestedAuthz = Iterables.toArray(labels, String.class); + if (requestedAuthz != null) { + if (requestedAuthz.length == 0) { + this.scanTimeAuthz = Authorizations.EMPTY; + } else { + this.scanTimeAuthz = new Authorizations(requestedAuthz); + } + } + } + + @Override + public Collection<String> getScanTimeAuthorizations() { + return this.scanTimeAuthz.getAuthorizations().stream() + .map(auth -> new String(auth, StandardCharsets.UTF_8)).collect(Collectors.toSet()); } } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java index 420fb738..b0f4e163 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/scanner/ScannerBuilderImpl.java @@ -21,6 +21,7 @@ import java.util.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.api.client.scanner.CellScanner; import org.apache.fluo.api.client.scanner.RowScannerBuilder; import org.apache.fluo.api.client.scanner.ScannerBuilder; @@ -37,10 +38,17 @@ public class ScannerBuilderImpl implements ScannerBuilder { private Span span = EMPTY_SPAN; private Collection<Column> columns = Collections.emptyList(); + private Authorizations scanTimeAuthz = Authorizations.EMPTY; + public ScannerBuilderImpl(TransactionImpl tx) { this.tx = tx; } + public ScannerBuilderImpl(TransactionImpl tx, Authorizations scanTimeAuthz) { + this.tx = tx; + this.scanTimeAuthz = scanTimeAuthz; + } + @Override public ScannerBuilder over(Span span) { Objects.requireNonNull(span); @@ -70,16 +78,26 @@ public class ScannerBuilderImpl implements ScannerBuilder { return this; } + public ScannerBuilder withLabels(Collection<String> authLables) { + Objects.requireNonNull(authLables); + if (authLables.isEmpty()) { + this.scanTimeAuthz = Authorizations.EMPTY; + } else { + this.scanTimeAuthz = new Authorizations(authLables.toArray(new String[authLables.size()])); + } + return this; + } + @Override public CellScanner build() { - SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns); + SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns, scanTimeAuthz); return new CellScannerImpl(snapScanner, columns); } @Override public RowScannerBuilder byRow() { return () -> { - SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns); + SnapshotScanner snapScanner = tx.newSnapshotScanner(span, columns, scanTimeAuthz); return new RowScannerImpl(snapScanner, columns); }; } diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java new file mode 100644 index 00000000..ce37dbbf --- /dev/null +++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientAuthorizationsIT.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.integration.client; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.exceptions.CommitException; +import org.apache.fluo.core.util.AccumuloUtil; +import org.apache.fluo.integration.ITBaseImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FluoClientAuthorizationsIT extends ITBaseImpl { + @Rule + public Timeout globalTimeout = Timeout.seconds(getTestTimeout()); + + Column ssn = new Column("", "ssn", "PRIVATE"); + Column name = new Column("", "name", "PUBLIC"); + Column id = new Column("", "id"); + FluoConfiguration conf; + FluoClient client; + + @Before + public void setupAuthorizations() throws Throwable { + try (AccumuloClient accumulo = AccumuloUtil.getClient(config)) { + accumulo.securityOperations().changeUserAuthorizations(config.getAccumuloUser(), + new Authorizations("PRIVATE", "PUBLIC")); + } + this.conf = new FluoConfiguration(config); + this.conf.setAccumuloAuthorizations("PRIVATE", "PUBLIC"); + this.client = FluoFactory.newClient(this.conf); + + writeSampleData(); + } + + /* + * Kind of sloppy because we assert some basic write functionality already works. However, this + * lets us re-use and organize tests that read this data better. + */ + public void writeSampleData() { + try (Transaction txn = client.newTransaction()) { + txn.set("bill", ssn, "000-00-0001"); + txn.set("bill", name, "william"); + txn.set("bill", id, "1"); + txn.set("bob", ssn, "000-00-0002"); + txn.set("bob", name, "robert"); + txn.set("bob", id, "2"); + txn.commit(); + } + } + + @After + public void cleanupClient() throws Throwable { + this.client.close(); + } + + @Test + public void testBasicRead() { + try (Snapshot snapshot = client.newSnapshot()) { + assertEquals(ImmutableSet.of("PUBLIC", "PRIVATE"), snapshot.getScanTimeAuthorizations()); + Map<Column, String> bill = snapshot.gets("bill"); + assertTrue(bill.containsKey(name)); + assertTrue(bill.containsKey(ssn)); + assertTrue(bill.containsKey(id)); + Map<Column, String> bob = snapshot.gets("bill"); + assertTrue(bob.containsKey(name)); + assertTrue(bob.containsKey(ssn)); + assertTrue(bob.containsKey(id)); + } + } + + @Test + public void testPublicRead() { + try (Snapshot snapshot = client.newSnapshot()) { + snapshot.setScanTimeAuthorizations(ImmutableList.of("PUBLIC")); + assertEquals(ImmutableSet.of("PUBLIC"), snapshot.getScanTimeAuthorizations()); + Map<Column, String> bill = snapshot.gets("bill"); + assertTrue(bill.containsKey(name)); + assertTrue(bill.containsKey(id)); + assertEquals(2, bill.size()); + } + } + + @Test + public void testPrivateRead() { + try (Snapshot snapshot = client.newSnapshot()) { + snapshot.setScanTimeAuthorizations(ImmutableList.of("PRIVATE")); + assertEquals(ImmutableSet.of("PRIVATE"), snapshot.getScanTimeAuthorizations()); + Map<Column, String> bill = snapshot.gets("bill"); + assertTrue(bill.containsKey(ssn)); + assertTrue(bill.containsKey(id)); + assertEquals(2, bill.size()); + } + } + + // had some initial uses where I checked for Authorizations.EMPTY instead of null + // or empty set of auths, which caused the underlying scanner to scan at max + // authorizations. I want this call to explicitly say "only read data that is + // unlabeled" + @Test + public void testScanningWithNoAuths() { + try (Snapshot snapshot = client.newSnapshot()) { + snapshot.setScanTimeAuthorizations(Collections.emptySet()); + assertEquals(Collections.emptySet(), snapshot.getScanTimeAuthorizations()); + Map<Column, String> bill = snapshot.gets("bill"); + assertFalse(bill.containsKey(name)); + assertFalse(bill.containsKey(ssn)); + assertTrue(bill.containsKey(id)); + Map<Column, String> bob = snapshot.gets("bob"); + assertFalse(bob.containsKey(name)); + assertFalse(bob.containsKey(ssn)); + assertTrue(bob.containsKey(id)); + } + + // create a client with config that does not have any auths set. This test the defaults when + // nothing was set for the client or snapshot. + try (FluoClient fc = FluoFactory.newClient(new FluoConfiguration(config))) { + try (Snapshot snapshot = fc.newSnapshot()) { + assertEquals(Collections.emptySet(), snapshot.getScanTimeAuthorizations()); + Map<Column, String> bill = snapshot.gets("bill"); + assertFalse(bill.containsKey(name)); + assertFalse(bill.containsKey(ssn)); + assertTrue(bill.containsKey(id)); + Map<Column, String> bob = snapshot.gets("bob"); + assertFalse(bob.containsKey(name)); + assertFalse(bob.containsKey(ssn)); + assertTrue(bob.containsKey(id)); + } + } + } + + @Test(expected = CommitException.class) + public void testWriteUnreadable() { + try (Transaction txn = client.newTransaction()) { + txn.set("bill", new Column("", "unreadable", "UNREADABLE"), "value"); + txn.commit(); + } + } +} diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java index da5271ca..ff8f094a 100644 --- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java +++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoClientIT.java @@ -15,15 +15,25 @@ package org.apache.fluo.integration.client; +import com.google.common.collect.ImmutableList; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.exceptions.CommitException; import org.apache.fluo.api.exceptions.FluoException; import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.fluo.core.util.AccumuloUtil; import org.apache.fluo.integration.ITBaseImpl; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -32,6 +42,15 @@ public class FluoClientIT extends ITBaseImpl { @Rule public Timeout globalTimeout = Timeout.seconds(getTestTimeout()); + + @Before + public void setupAuthorizations() throws Throwable { + try (AccumuloClient accumulo = AccumuloUtil.getClient(config)) { + accumulo.securityOperations().changeUserAuthorizations(config.getAccumuloUser(), + new Authorizations("PRIVATE", "PUBLIC")); + } + } + @Test public void testBasic() { try (FluoClient client = FluoFactory.newClient(config)) { @@ -83,4 +102,14 @@ public class FluoClientIT extends ITBaseImpl { Logger.getLogger(FluoClientImpl.class).setLevel(clientLevel); Logger.getLogger(FluoFactory.class).setLevel(factoryLevel); } + + @Test(expected = CommitException.class) + public void testWriteWithDefaultAuths() throws Throwable { + Column labeledColumn = new Column("data", "private_column", "PRIVATE"); + try (FluoClient client = FluoFactory.newClient(config); + Transaction txn = client.newTransaction()) { + txn.set("bill", labeledColumn, "a value"); + txn.commit(); + } + } }